嘗試用卷積AE和卷積VAE做無監督檢測,思路如下:
1.先用正常樣本訓練AE或VAE
2.輸入測試集給AE或VAE,獲得重構的測試集數據。
3.計算重構的數據和原始數據的誤差,如果誤差大於某一個閾值,則此測試樣本為一樣。
對於數據集的描述如下:
本數據集一共有10100個樣本,每個樣本是1行48列的向量,為了讓它變成矩陣,自己在末尾補了一個0,將其轉變成7*7的矩陣。前8000個是正常樣本。后2100個中,前300個是正常樣本,之后的1800個中包括6種異常時間序列,每種異常時間序列包括300個樣本。
VAE的代碼如下:
#https://blog.csdn.net/wyx100/article/details/80647379 '''This script demonstrates how to build a variational autoencoder with Keras and deconvolution layers. 使用Keras和反卷積層建立變分自編碼器演示腳本 # Reference - Auto-Encoding Variational Bayes 自動編碼變分貝葉斯 https://arxiv.org/abs/1312.6114 ''' from __future__ import print_function import numpy as np import matplotlib.pyplot as plt from scipy.stats import norm from pandas import read_csv from keras.layers import Input, Dense, Lambda, Flatten, Reshape from keras.layers import Conv2D, Conv2DTranspose from keras.models import Model from keras import backend as K from keras import metrics import xlwt from keras.datasets import mnist from matplotlib import pyplot import numpy # input image dimensions # 輸入圖像維度 img_rows, img_cols, img_chns = 7, 7, 1 dimension_image=7 # number of convolutional filters to use # 使用的卷積過濾器數量 filters = 64 # convolution kernel size # 卷積核大小 num_conv = 3 batch_size = 50 if K.image_data_format() == 'channels_first': original_img_size = (img_chns, img_rows, img_cols) else: original_img_size = (img_rows, img_cols, img_chns) latent_dim = 2 intermediate_dim = 128 epsilon_std = 1.0 epochs = 100 x = Input(shape=original_img_size) conv_1 = Conv2D(img_chns, kernel_size=(2, 2), padding='same', activation='relu')(x) conv_2 = Conv2D(filters, kernel_size=(2, 2), padding='same', activation='relu', strides=(2, 2))(conv_1) conv_3 = Conv2D(filters, kernel_size=num_conv, padding='same', activation='relu', strides=1)(conv_2) conv_4 = Conv2D(filters, kernel_size=num_conv, padding='same', activation='relu', strides=1)(conv_3) flat = Flatten()(conv_4) hidden = Dense(intermediate_dim, activation='relu')(flat) z_mean = Dense(latent_dim)(hidden) z_log_var = Dense(latent_dim)(hidden) def sampling(args): z_mean, z_log_var = args epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim), mean=0., stddev=epsilon_std) return z_mean + K.exp(z_log_var) * epsilon # note that "output_shape" isn't necessary with the TensorFlow backend # so you could write `Lambda(sampling)([z_mean, z_log_var])` # 注意,“output_shape”對於TensorFlow后端不是必需的。因此可以編寫Lambda(sampling)([z_mean, z_log_var])` z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var]) # we instantiate these layers separately so as to reuse them later # 分別實例化這些層,以便在以后重用它們。 number=4 decoder_hid = Dense(intermediate_dim, activation='relu') decoder_upsample = Dense(filters * number * number, activation='relu') if K.image_data_format() == 'channels_first': output_shape = (batch_size, filters, number, number) else: output_shape = (batch_size, number, number, filters) decoder_reshape = Reshape(output_shape[1:]) decoder_deconv_1 = Conv2DTranspose(filters, kernel_size=num_conv, padding='same', strides=1, activation='relu') decoder_deconv_2 = Conv2DTranspose(filters, kernel_size=num_conv, padding='same', strides=1, activation='relu') if K.image_data_format() == 'channels_first': output_shape = (batch_size, filters, 13, 13) else: output_shape = (batch_size,13, 13, filters) decoder_deconv_3_upsamp = Conv2DTranspose(filters, kernel_size=(3, 3), strides=(2, 2), padding='valid', activation='relu') decoder_mean_squash = Conv2D(img_chns, kernel_size=3, padding='valid', activation='sigmoid') hid_decoded = decoder_hid(z) up_decoded = decoder_upsample(hid_decoded) reshape_decoded = decoder_reshape(up_decoded) deconv_1_decoded = decoder_deconv_1(reshape_decoded) deconv_2_decoded = decoder_deconv_2(deconv_1_decoded) x_decoded_relu = decoder_deconv_3_upsamp(deconv_2_decoded) x_decoded_mean_squash = decoder_mean_squash(x_decoded_relu) # instantiate VAE model # 實例化VAE模型 vae = Model(x, x_decoded_mean_squash) # Compute VAE loss # 計算VAE損失 xent_loss = img_rows * img_cols * metrics.binary_crossentropy( K.flatten(x), K.flatten(x_decoded_mean_squash)) kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1) vae_loss = K.mean(xent_loss + kl_loss) vae.add_loss(vae_loss) vae.compile(optimizer='Adam') vae.summary() dataset = read_csv('randperm_zerone_Dataset.csv') values = dataset.values XY= values n_train_hours1 =7000 n_train_hours3 =8000 x_train=XY[:n_train_hours1,:] x_valid =XY[n_train_hours1:n_train_hours3, :] x_test =XY[n_train_hours3:, :] x_train=x_train.reshape(-1,dimension_image,dimension_image,1) x_valid=x_valid.reshape(-1,dimension_image,dimension_image,1) x_test=x_test.reshape(-1,dimension_image,dimension_image,1) history=vae.fit(x_train, shuffle=True, epochs=epochs, batch_size=batch_size, validation_data=(x_valid, None)) pyplot.plot(history.history['loss'], label='train') pyplot.plot(history.history['val_loss'], label='valid') pyplot.legend() pyplot.show() # 建立一個潛在空間輸入模型 encoder = Model(x, z_mean) # 在潛在空間中顯示數字類的2D圖 x_test_encoded = encoder.predict(x_test, batch_size=batch_size) plt.figure(figsize=(6, 6)) plt.scatter(x_test_encoded[:, 0], x_test_encoded[:, 1]) plt.show() Reconstructed_train = vae.predict(x_train) Reconstructed_valid = vae.predict(x_valid) Reconstructed_test = vae.predict(x_test) ReconstructedData1=np.vstack((Reconstructed_train,Reconstructed_valid)) ReconstructedData2=np.vstack((ReconstructedData1,Reconstructed_test)) ReconstructedData3=ReconstructedData2.reshape((ReconstructedData2.shape[0], -1)) numpy.savetxt("ReconstructedData.csv", ReconstructedData3, delimiter=',')
AE代碼如下
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D from keras.models import Model from keras import backend as K import numpy as np from pandas import read_csv from matplotlib import pyplot import numpy dimension_image=7 input_img = Input(shape=(dimension_image, dimension_image, 1)) # adapt this if using `channels_first` image data format x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img) x = MaxPooling2D((2, 2), padding='same')(x) x = Conv2D(8, (3, 3), activation='relu', padding='same')(x) x = MaxPooling2D((2, 2), padding='same')(x) x = Conv2D(8, (3, 3), activation='relu', padding='same')(x) encoded = MaxPooling2D((2, 2), padding='same')(x) # at this point the representation is (4, 4, 8) i.e. 128-dimensional x = Conv2D(8, (3, 3), activation='relu', padding='same')(encoded) x = UpSampling2D((2, 2))(x) x = Conv2D(8, (3, 3), activation='relu', padding='same')(x) x = UpSampling2D((2, 2))(x) x = Conv2D(16, (3, 3), activation='relu', padding='same')(x) x = UpSampling2D((2, 2))(x) decoded = Conv2D(1, (2, 2), activation='sigmoid')(x) autoencoder = Model(input_img, decoded) autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy') autoencoder.summary() dataset = read_csv('randperm_zerone_Dataset.csv') values = dataset.values XY= values n_train_hours1 =7000 n_train_hours3 =8000 x_train=XY[:n_train_hours1,:] x_valid =XY[n_train_hours1:n_train_hours3, :] x_test =XY[n_train_hours3:, :] x_train=x_train.reshape(-1,dimension_image,dimension_image,1) x_valid=x_valid.reshape(-1,dimension_image,dimension_image,1) x_test=x_test.reshape(-1,dimension_image,dimension_image,1) history=autoencoder.fit(x_train, x_train, epochs=200, batch_size=32, shuffle=True, validation_data=(x_valid, x_valid)) pyplot.plot(history.history['loss'], label='train') pyplot.plot(history.history['val_loss'], label='valid') pyplot.legend() pyplot.show() Reconstructed_train = autoencoder.predict(x_train) Reconstructed_valid = autoencoder.predict(x_valid) Reconstructed_test = autoencoder.predict(x_test) ReconstructedData1=np.vstack((Reconstructed_train,Reconstructed_valid)) ReconstructedData2=np.vstack((ReconstructedData1,Reconstructed_test)) ReconstructedData3=ReconstructedData2.reshape((ReconstructedData2.shape[0], -1)) numpy.savetxt("ReconstructedData.csv", ReconstructedData3, delimiter=',')
至於數據集,正在上傳到百度文庫,以后更新