無監督異常檢測之卷積AE和卷積VAE


嘗試用卷積AE和卷積VAE做無監督檢測,思路如下:

1.先用正常樣本訓練AE或VAE

2.輸入測試集給AE或VAE,獲得重構的測試集數據。

3.計算重構的數據和原始數據的誤差,如果誤差大於某一個閾值,則此測試樣本為一樣。

對於數據集的描述如下:

本數據集一共有10100個樣本,每個樣本是1行48列的向量,為了讓它變成矩陣,自己在末尾補了一個0,將其轉變成7*7的矩陣。前8000個是正常樣本。后2100個中,前300個是正常樣本,之后的1800個中包括6種異常時間序列,每種異常時間序列包括300個樣本。

VAE的代碼如下:

#https://blog.csdn.net/wyx100/article/details/80647379
'''This script demonstrates how to build a variational autoencoder
with Keras and deconvolution layers.
使用Keras和反卷積層建立變分自編碼器演示腳本
# Reference
- Auto-Encoding Variational Bayes
  自動編碼變分貝葉斯
  https://arxiv.org/abs/1312.6114
'''
from __future__ import print_function
 
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from pandas import read_csv 
from keras.layers import Input, Dense, Lambda, Flatten, Reshape
from keras.layers import Conv2D, Conv2DTranspose
from keras.models import Model
from keras import backend as K
from keras import metrics
import xlwt
from keras.datasets import mnist
from matplotlib import pyplot
import numpy
# input image dimensions
# 輸入圖像維度
img_rows, img_cols, img_chns = 7, 7, 1
dimension_image=7
# number of convolutional filters to use
# 使用的卷積過濾器數量
filters = 64
# convolution kernel size
# 卷積核大小
num_conv = 3
 
batch_size = 50
if K.image_data_format() == 'channels_first':
    original_img_size = (img_chns, img_rows, img_cols)
else:
    original_img_size = (img_rows, img_cols, img_chns)
latent_dim = 2
intermediate_dim = 128
epsilon_std = 1.0
epochs = 100
 
x = Input(shape=original_img_size)
conv_1 = Conv2D(img_chns,
                kernel_size=(2, 2),
                padding='same', activation='relu')(x)
conv_2 = Conv2D(filters,
                kernel_size=(2, 2),
                padding='same', activation='relu',
                strides=(2, 2))(conv_1)
conv_3 = Conv2D(filters,
                kernel_size=num_conv,
                padding='same', activation='relu',
                strides=1)(conv_2)
conv_4 = Conv2D(filters,
                kernel_size=num_conv,
                padding='same', activation='relu',
                strides=1)(conv_3)
flat = Flatten()(conv_4)
hidden = Dense(intermediate_dim, activation='relu')(flat)
 
z_mean = Dense(latent_dim)(hidden)
z_log_var = Dense(latent_dim)(hidden)
 
 
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim),
                              mean=0., stddev=epsilon_std)
    return z_mean + K.exp(z_log_var) * epsilon
 
# note that "output_shape" isn't necessary with the TensorFlow backend
# so you could write `Lambda(sampling)([z_mean, z_log_var])`
# 注意,“output_shape”對於TensorFlow后端不是必需的。因此可以編寫Lambda(sampling)([z_mean, z_log_var])`
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
 
# we instantiate these layers separately so as to reuse them later
# 分別實例化這些層,以便在以后重用它們。
number=4
decoder_hid = Dense(intermediate_dim, activation='relu')
decoder_upsample = Dense(filters * number * number, activation='relu')
 
if K.image_data_format() == 'channels_first':
    output_shape = (batch_size, filters, number, number)
else:
    output_shape = (batch_size, number, number, filters)
 
decoder_reshape = Reshape(output_shape[1:])
decoder_deconv_1 = Conv2DTranspose(filters,
                                   kernel_size=num_conv,
                                   padding='same',
                                   strides=1,
                                   activation='relu')
decoder_deconv_2 = Conv2DTranspose(filters,
                                   kernel_size=num_conv,
                                   padding='same',
                                   strides=1,
                                   activation='relu')
if K.image_data_format() == 'channels_first':
    output_shape = (batch_size, filters, 13, 13)
else:
    output_shape = (batch_size,13, 13, filters)
decoder_deconv_3_upsamp = Conv2DTranspose(filters,
                                          kernel_size=(3, 3),
                                          strides=(2, 2),
                                          padding='valid',
                                          activation='relu')
decoder_mean_squash = Conv2D(img_chns,
                             kernel_size=3,
                             padding='valid',
                             activation='sigmoid')
 
hid_decoded = decoder_hid(z)
up_decoded = decoder_upsample(hid_decoded)
reshape_decoded = decoder_reshape(up_decoded)
deconv_1_decoded = decoder_deconv_1(reshape_decoded)
deconv_2_decoded = decoder_deconv_2(deconv_1_decoded)
x_decoded_relu = decoder_deconv_3_upsamp(deconv_2_decoded)
x_decoded_mean_squash = decoder_mean_squash(x_decoded_relu)
 
# instantiate VAE model
# 實例化VAE模型
vae = Model(x, x_decoded_mean_squash)
# Compute VAE loss
# 計算VAE損失
xent_loss = img_rows * img_cols * metrics.binary_crossentropy(
    K.flatten(x),
    K.flatten(x_decoded_mean_squash))
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
vae_loss = K.mean(xent_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='Adam')
vae.summary()
 
dataset = read_csv('randperm_zerone_Dataset.csv')
values = dataset.values
XY= values
n_train_hours1 =7000
n_train_hours3 =8000
x_train=XY[:n_train_hours1,:]
x_valid =XY[n_train_hours1:n_train_hours3, :]
x_test =XY[n_train_hours3:, :]
x_train=x_train.reshape(-1,dimension_image,dimension_image,1)
x_valid=x_valid.reshape(-1,dimension_image,dimension_image,1)
x_test=x_test.reshape(-1,dimension_image,dimension_image,1)

history=vae.fit(x_train,
                shuffle=True,
                epochs=epochs,
                batch_size=batch_size,
                validation_data=(x_valid, None))
pyplot.plot(history.history['loss'], label='train')
pyplot.plot(history.history['val_loss'], label='valid')
pyplot.legend()
pyplot.show()

# 建立一個潛在空間輸入模型
encoder = Model(x, z_mean)
# 在潛在空間中顯示數字類的2D圖
x_test_encoded = encoder.predict(x_test, batch_size=batch_size)
plt.figure(figsize=(6, 6))
plt.scatter(x_test_encoded[:, 0], x_test_encoded[:, 1])
plt.show()

Reconstructed_train = vae.predict(x_train)
Reconstructed_valid = vae.predict(x_valid)
Reconstructed_test  = vae.predict(x_test)
ReconstructedData1=np.vstack((Reconstructed_train,Reconstructed_valid))
ReconstructedData2=np.vstack((ReconstructedData1,Reconstructed_test))
ReconstructedData3=ReconstructedData2.reshape((ReconstructedData2.shape[0], -1))

numpy.savetxt("ReconstructedData.csv", ReconstructedData3, delimiter=',')

AE代碼如下

from keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D
from keras.models import Model
from keras import backend as K
import numpy as np
from pandas import read_csv 
from matplotlib import pyplot
import numpy

dimension_image=7
input_img = Input(shape=(dimension_image, dimension_image, 1))  # adapt this if using `channels_first` image data format
x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
encoded = MaxPooling2D((2, 2), padding='same')(x)

# at this point the representation is (4, 4, 8) i.e. 128-dimensional
x = Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
x = UpSampling2D((2, 2))(x)
x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(1, (2, 2), activation='sigmoid')(x)

autoencoder = Model(input_img, decoded)
autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy')
autoencoder.summary()

dataset = read_csv('randperm_zerone_Dataset.csv')
values = dataset.values
XY= values
n_train_hours1 =7000
n_train_hours3 =8000
x_train=XY[:n_train_hours1,:]
x_valid =XY[n_train_hours1:n_train_hours3, :]
x_test =XY[n_train_hours3:, :]
x_train=x_train.reshape(-1,dimension_image,dimension_image,1)
x_valid=x_valid.reshape(-1,dimension_image,dimension_image,1)
x_test=x_test.reshape(-1,dimension_image,dimension_image,1)

history=autoencoder.fit(x_train, x_train,
                        epochs=200,
                        batch_size=32,
                        shuffle=True,
                        validation_data=(x_valid, x_valid))
pyplot.plot(history.history['loss'], label='train')
pyplot.plot(history.history['val_loss'], label='valid')
pyplot.legend()
pyplot.show()
Reconstructed_train = autoencoder.predict(x_train)
Reconstructed_valid = autoencoder.predict(x_valid)
Reconstructed_test  = autoencoder.predict(x_test)
ReconstructedData1=np.vstack((Reconstructed_train,Reconstructed_valid))
ReconstructedData2=np.vstack((ReconstructedData1,Reconstructed_test))
ReconstructedData3=ReconstructedData2.reshape((ReconstructedData2.shape[0], -1))

numpy.savetxt("ReconstructedData.csv", ReconstructedData3, delimiter=',')

至於數據集,正在上傳到百度文庫,以后更新


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM