keywords: 时间序列处理; 深度学习; keras
针对现有时间序列分类方法的特征提取与分类过程分离,且无法提取存在于不同时间尺度序列的不同特征的问题,作者提出MCNN模型。
对于单一时间序列输入,进行降采样和滑动平均等变化,产生多组长度不同的时间序列,并在多组时间序列上进行卷积,提取不同时间尺度序列的特征。
image
- Original 表示原始时间序列
- Multi-Frequency 表示对原始数据滑动平均后的序列
image
- Multi-Scale 表示对原始数据降采样后的序列
image
通过降采样的变换,实现在不同时间尺度的序列上的特征提取。
通过滑动平均的变换,实现对噪音的抵抗性。
加载数据
论文中使用了UCR数据集中的44个任务,首先,根据数据集任务名获取数据集的存储路径。
def get_datasets_path(base_path): datasets_used = ['Adiac', 'Beef', 'CBF', 'ChlorineConcentration', 'CinC_ECG_torso', 'Coffee', 'Cricket_X', 'Cricket_Y', 'Cricket_Z', 'DiatomSizeReduction', 'ECGFiveDays', 'FaceAll', 'FaceFour', 'FacesUCR', '50words', 'FISH', 'Gun_Point', 'Haptics', 'InlineSkate', 'ItalyPowerDemand', 'Lighting2', 'Lighting7', 'MALLAT', 'MedicalImages', 'MoteStrain', 'NonInvasiveFatalECG_Thorax1', 'NonInvasiveFatalECG_Thorax2', 'OliveOil', 'OSULeaf', 'SonyAIBORobotSurface', 'SonyAIBORobotSurfaceII', 'StarLightCurves', 'SwedishLeaf', 'Symbols', 'synthetic_control', 'Trace', 'TwoLeadECG', 'Two_Patterns', 'UWaveGestureLibrary_X', 'UWaveGestureLibrary_Y', 'UWaveGestureLibrary_Z', 'wafer', 'WordSynonyms', 'yoga'] data_list = os.listdir(base_path) return ["%s/%s/%s_" % (base_path, data, data) for data in datasets_used if data in data_list]
数据集分为TRAIN和TEST两个文件,分别存储训练集和测试集。作者使用了一种增加数据规模的技巧,使用滑动窗口在时间序列上截取数据,并共享同一个类别标签,同时增加了测试集和训练集的规模。
def data_augmentation(feature, label, ws): seq_len = feature.shape[1] aug_feature, aug_label = feature[:, :ws], label for i in range(1, seq_len-ws+1): _feature = feature[:, i:i+ws] aug_feature = np.concatenate((aug_feature, _feature), axis=0) aug_label = np.concatenate((aug_label, label), axis=0) return aug_feature, aug_label def load_feature_label(path, aug_times=0): data = np.loadtxt(path, dtype=np.float, delimiter=',') # the first column is label, and the rest are features feature, label = data[:, 1:], data[:, 0] if aug_times>0: feature, label = data_augmentation(feature, label, data.shape[1]-aug_times) return feature, label
数据变换
降采样
使用一组降采样因子 k1, k2, k3,每隔 ki-1 个数据取一个。
def down_sampling(data, rates): ds_seq_len = [] ds_data = [] # down sampling by rate k for k in rates: if k > data.shape[1] / 3: break _data = data[:, ::k] # temp after down sampling ds_data.append(_data) ds_seq_len.append(_data.shape[1]) # remark the length info return ds_data, ds_seq_len
滑动平均
使用一组滑动窗口l1, l2, l3,每li个数据取平均。
def moving_average(data, moving_ws): num, seq_len = data.shape[0], data.shape[1] ma_data = [] ma_seq_len = [] for ws in moving_ws: if ws>data.shape[1]/3: break _data = np.zeros((num, seq_len-ws+1)) for i in range(seq_len-ws+1): _data[:, i] = np.mean(data[:, i: i+ws], axis=1) ma_data.append(_data) ma_seq_len.append(_data.shape[1]) return ma_data, ma_seq_len
获取MCNN输入
通过多组降频因子k,和滑动窗口l,对原序列进行处理,得到多个时间序列,并在不同时间序列上进行一维卷积操作,提取在不同时间规模下的抽象特征,是该论文的主要思想。
def get_mcnn_input(feature, label): origin = feature ms_branch, ms_lens = down_sampling(feature, rates=[2, 3, 4, 5]) mf_branch, mf_lens = moving_average(feature, moving_ws=[5, 8, 11]) label = np_utils.to_categorical(label) # one hot features = [origin, *ms_branch, *mf_branch] features = [data.reshape(data.shape+(1,)) for data in features] data_lens = [origin.shape[1], *ms_lens, *mf_lens] return features, label
模型定义
mcnn 模型的输入是多个长度不唯一的时间序列,为了减少代码长度,使用列表推导式建立模型。
def MCNN_model(feature_lens, class_num): input_sigs = [Input(shape=(bra_len, 1)) for bra_len in feature_lens] # local convolution ms_sigs = [] for i in range(len(input_sigs)): _ms = Conv1D(padding='same', kernel_size=conv_size, filters=256, activation='relu')(input_sigs[i]) pooling_size = (_ms.shape[1].value - conv_size + 1) // pooling_factor _ms = MaxPooling1D(pool_size=pooling_size)(_ms) ms_sigs.append(_ms) merged = concatenate(ms_sigs, axis=1) # fully convolution conved = Conv1D(padding='valid', kernel_size=conv_size, filters=256, activation='relu')(merged) pooled = MaxPooling1D(pool_size=5)(conved) x = Flatten()(pooled) x = Dense(256, activation='relu')(x) x = Dense(256, activation='relu')(x) x = Dense(class_num, activation='softmax')(x) MCNN = Model(inputs=input_sigs, outputs=x) # MCNN.summary() MCNN.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy']) return MCNN
主函数
def on_single_dataset(data_path): data_name = data_path.split('/')[-2] # get the origin time series f_train, la_tr = load_feature_label(data_path+'TRAIN', aug_times=0) f_test, la_te = load_feature_label(data_path+'TEST', aug_times=0) f_train, f_test = normalization(f_train, f_test) # get the transform sequences f_trains, la_tr = get_mcnn_input(f_train, la_tr) f_tests, la_te = get_mcnn_input(f_test, la_te) feat_lens = [data.shape[1] for data in f_trains] # get the length info of each transform sequence class_num = la_tr.shape[1] mcnn = MCNN_model(feat_lens, class_num) mcnn.fit(f_trains, la_tr, batch_size=128, epochs=64, verbose=0) te_loss, te_acc = mcnn.evaluate(f_tests, la_te, verbose=0) print("[%(data)s]: Test loss - %(loss).2f, Test accuracy - %(acc).2f%%" % {'data': data_name, 'loss': te_loss, 'acc': te_acc*100}) la_pred = mcnn.predict(f_tests) for data in f_trains, f_tests, la_te, la_tr: del data gc.collect() def main(): dataPaths = get_datasets_path(base_path) for dp in dataPaths: on_single_dataset(dp)