自動編碼器是一種特殊的神經網絡,經過訓練可以將其輸入復制到其輸出。例如,給定手寫數字的圖像,自動編碼器首先將圖像編碼為較低維的潛在表示,然后將潛在表示解碼回圖像。自動編碼器學會在最小化重構誤差的同時壓縮數據。
要了解有關自動編碼器的更多信息,請考慮閱讀Ian Goodfellow,Yoshua Bengio和Aaron Courville撰寫的Deep Learning中的第14章。
導入TensorFlow和其他庫
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model
加載數據集
首先,您將使用Fashon MNIST數據集訓練基本的自動編碼器。該數據集中的每個圖像均為28x28像素。
(x_train, _), (x_test, _) = fashion_mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
print (x_train.shape)
print (x_test.shape)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz 32768/29515 [=================================] - 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz 26427392/26421880 [==============================] - 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz 8192/5148 [===============================================] - 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz 4423680/4422102 [==============================] - 0s 0us/step (60000, 28, 28) (10000, 28, 28)
第一個示例:基本自動編碼器

定義一個具有兩個密集層的自動encoder
:一個encoder
(將圖像壓縮為64維潛矢量)和一個decoder
(從decoder
空間重建原始圖像)。
要定義模型,請使用Keras模型子類API 。
latent_dim = 64
class Autoencoder(Model):
def __init__(self, encoding_dim):
super(Autoencoder, self).__init__()
self.latent_dim = latent_dim
self.encoder = tf.keras.Sequential([
layers.Flatten(),
layers.Dense(latent_dim, activation='relu'),
])
self.decoder = tf.keras.Sequential([
layers.Dense(784, activation='sigmoid'),
layers.Reshape((28, 28))
])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
autoencoder = Autoencoder(latent_dim)
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
使用x_train
作為輸入和目標來訓練模型。 encoder
將學習將數據集從784個維壓縮到潛在空間,而decoder
將學習重建原始圖像。 。
autoencoder.fit(x_train, x_train,
epochs=10,
shuffle=True,
validation_data=(x_test, x_test))
Epoch 1/10 1875/1875 [==============================] - 3s 1ms/step - loss: 0.0236 - val_loss: 0.0133 Epoch 2/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0116 - val_loss: 0.0106 Epoch 3/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0100 - val_loss: 0.0097 Epoch 4/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0094 - val_loss: 0.0094 Epoch 5/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0091 - val_loss: 0.0091 Epoch 6/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0090 - val_loss: 0.0091 Epoch 7/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0089 - val_loss: 0.0089 Epoch 8/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0088 - val_loss: 0.0089 Epoch 9/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0088 - val_loss: 0.0088 Epoch 10/10 1875/1875 [==============================] - 2s 1ms/step - loss: 0.0087 - val_loss: 0.0088 <tensorflow.python.keras.callbacks.History at 0x7f7076d484e0>
現在已經對模型進行了訓練,讓我們通過對測試集中的圖像進行編碼和解碼來對其進行測試。
encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original
ax = plt.subplot(2, n, i + 1)
plt.imshow(x_test[i])
plt.title("original")
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
ax = plt.subplot(2, n, i + 1 + n)
plt.imshow(decoded_imgs[i])
plt.title("reconstructed")
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()

第二個例子:圖像去噪

還可以訓練自動編碼器以消除圖像中的噪點。在以下部分中,您將通過對每個圖像應用隨機噪聲來創建Fashion MNIST數據集的嘈雜版本。然后,您將使用嘈雜的圖像作為輸入,並以原始圖像為目標來訓練自動編碼器。
讓我們重新導入數據集以省略之前所做的修改。
(x_train, _), (x_test, _) = fashion_mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]
print(x_train.shape)
(60000, 28, 28, 1)
給圖像添加隨機噪聲
noise_factor = 0.2
x_train_noisy = x_train + noise_factor * tf.random.normal(shape=x_train.shape)
x_test_noisy = x_test + noise_factor * tf.random.normal(shape=x_test.shape)
x_train_noisy = tf.clip_by_value(x_train_noisy, clip_value_min=0., clip_value_max=1.)
x_test_noisy = tf.clip_by_value(x_test_noisy, clip_value_min=0., clip_value_max=1.)
繪制嘈雜的圖像。
n = 10
plt.figure(figsize=(20, 2))
for i in range(n):
ax = plt.subplot(1, n, i + 1)
plt.title("original + noise")
plt.imshow(tf.squeeze(x_test_noisy[i]))
plt.gray()
plt.show()

定義卷積自動編碼器
在本例中,將訓練使用卷積自動編碼Conv2D層在encoder
,和Conv2DTranspose層在decoder
。
class Denoise(Model):
def __init__(self):
super(Denoise, self).__init__()
self.encoder = tf.keras.Sequential([
layers.Input(shape=(28, 28, 1)),
layers.Conv2D(16, (3,3), activation='relu', padding='same', strides=2),
layers.Conv2D(8, (3,3), activation='relu', padding='same', strides=2)])
self.decoder = tf.keras.Sequential([
layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2D(1, kernel_size=(3,3), activation='sigmoid', padding='same')])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
autoencoder = Denoise()
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
autoencoder.fit(x_train_noisy, x_train,
epochs=10,
shuffle=True,
validation_data=(x_test_noisy, x_test))
Epoch 1/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0177 - val_loss: 0.0108 Epoch 2/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0100 - val_loss: 0.0095 Epoch 3/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0091 - val_loss: 0.0087 Epoch 4/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0085 - val_loss: 0.0084 Epoch 5/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0082 - val_loss: 0.0083 Epoch 6/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0080 - val_loss: 0.0080 Epoch 7/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0079 - val_loss: 0.0079 Epoch 8/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0078 - val_loss: 0.0078 Epoch 9/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0077 - val_loss: 0.0077 Epoch 10/10 1875/1875 [==============================] - 4s 2ms/step - loss: 0.0076 - val_loss: 0.0076 <tensorflow.python.keras.callbacks.History at 0x7f70600ede48>
讓我們看一下編碼器的摘要。請注意,圖像是如何從28x28下采樣到7x7的。
autoencoder.encoder.summary()
Model: "sequential_2" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d (Conv2D) (None, 14, 14, 16) 160 _________________________________________________________________ conv2d_1 (Conv2D) (None, 7, 7, 8) 1160 ================================================================= Total params: 1,320 Trainable params: 1,320 Non-trainable params: 0 _________________________________________________________________
解碼器將圖像從7x7升采樣到28x28。
autoencoder.decoder.summary()
Model: "sequential_3" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d_transpose (Conv2DTran (None, 14, 14, 8) 584 _________________________________________________________________ conv2d_transpose_1 (Conv2DTr (None, 28, 28, 16) 1168 _________________________________________________________________ conv2d_2 (Conv2D) (None, 28, 28, 1) 145 ================================================================= Total params: 1,897 Trainable params: 1,897 Non-trainable params: 0 _________________________________________________________________
繪制由自動編碼器產生的噪聲圖像和去噪圖像。
encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original + noise
ax = plt.subplot(2, n, i + 1)
plt.title("original + noise")
plt.imshow(tf.squeeze(x_test_noisy[i]))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
bx = plt.subplot(2, n, i + n + 1)
plt.title("reconstructed")
plt.imshow(tf.squeeze(decoded_imgs[i]))
plt.gray()
bx.get_xaxis().set_visible(False)
bx.get_yaxis().set_visible(False)
plt.show()

第三個示例:異常檢測總覽
在此示例中,您將訓練自動編碼器以檢測ECG5000數據集上的異常。該數據集包含5,000個心電圖 ,每個心電圖包含140個數據點。您將使用數據集的簡化版本,其中每個示例都被標記為0
(對應於異常節奏)或1
(對應於正常節奏)。您對識別異常節律感興趣。
您將如何使用自動編碼器檢測異常?回想一下,對自動編碼器進行了培訓,以最大程度地減少重構誤差。您將只按照正常節奏訓練自動編碼器,然后使用它來重構所有數據。我們的假設是,異常節律將具有較高的重建誤差。然后,如果重構誤差超過固定閾值,則將節奏分類為異常。
加載心電圖數據
您將使用的數據集基於timeseriesclassification.com中的數據集。
# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()

# The last element contains the labels
labels = raw_data[:, -1]
# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]
train_data, test_data, train_labels, test_labels = train_test_split(
data, labels, test_size=0.2, random_state=21
)
將數據標准化為[0,1]
。
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)
train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)
train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)
您將僅使用正常節奏訓練自動編碼器,在此數據集中標記為1
。將正常節律與異常節律分開。
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)
normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]
anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]
繪制正常的心電圖。
plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()

繪制異常的心電圖。
plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

建立模型
class AnomalyDetector(Model):
def __init__(self):
super(AnomalyDetector, self).__init__()
self.encoder = tf.keras.Sequential([
layers.Dense(32, activation="relu"),
layers.Dense(16, activation="relu"),
layers.Dense(8, activation="relu")])
self.decoder = tf.keras.Sequential([
layers.Dense(16, activation="relu"),
layers.Dense(32, activation="relu"),
layers.Dense(140, activation="sigmoid")])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
autoencoder = AnomalyDetector()
autoencoder.compile(optimizer='adam', loss='mae')
請注意,僅使用常規ECG訓練自動編碼器,但使用完整的測試集對其進行評估。
history = autoencoder.fit(normal_train_data, normal_train_data,
epochs=20,
batch_size=512,
validation_data=(test_data, test_data),
shuffle=True)
Epoch 1/20 5/5 [==============================] - 0s 47ms/step - loss: 0.0589 - val_loss: 0.0535 Epoch 2/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0561 - val_loss: 0.0519 Epoch 3/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0536 - val_loss: 0.0502 Epoch 4/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0499 - val_loss: 0.0483 Epoch 5/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0457 - val_loss: 0.0465 Epoch 6/20 5/5 [==============================] - 0s 6ms/step - loss: 0.0417 - val_loss: 0.0437 Epoch 7/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0378 - val_loss: 0.0418 Epoch 8/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0343 - val_loss: 0.0403 Epoch 9/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0312 - val_loss: 0.0386 Epoch 10/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0288 - val_loss: 0.0377 Epoch 11/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0270 - val_loss: 0.0367 Epoch 12/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0257 - val_loss: 0.0363 Epoch 13/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0247 - val_loss: 0.0356 Epoch 14/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0239 - val_loss: 0.0355 Epoch 15/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0234 - val_loss: 0.0350 Epoch 16/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0230 - val_loss: 0.0348 Epoch 17/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0226 - val_loss: 0.0344 Epoch 18/20 5/5 [==============================] - 0s 4ms/step - loss: 0.0221 - val_loss: 0.0343 Epoch 19/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0218 - val_loss: 0.0340 Epoch 20/20 5/5 [==============================] - 0s 5ms/step - loss: 0.0214 - val_loss: 0.0338
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
<matplotlib.legend.Legend at 0x7f7076948a20>

如果重建誤差大於正常訓練示例的一個標准偏差,您將很快將ECG歸類為異常。首先,讓我們從訓練集中繪制正常的ECG,通過自動編碼器進行編碼和解碼后的重構以及重構誤差。
encoded_imgs = autoencoder.encoder(normal_test_data).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
plt.plot(normal_test_data[0],'b')
plt.plot(decoded_imgs[0],'r')
plt.fill_between(np.arange(140), decoded_imgs[0], normal_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

創建一個類似的圖,這次是一個異常的測試示例。
encoded_imgs = autoencoder.encoder(anomalous_test_data).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
plt.plot(anomalous_test_data[0],'b')
plt.plot(decoded_imgs[0],'r')
plt.fill_between(np.arange(140), decoded_imgs[0], anomalous_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

檢測異常
通過計算重建損失是否大於固定閾值來檢測異常。在本教程中,您將計算出訓練集中正常樣本的平均平均誤差,如果重構誤差大於訓練集中的一個標准偏差,則將未來的樣本歸類為異常。
從訓練集中繪制正常心電圖上的重建誤差
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)
plt.hist(train_loss, bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

選擇一個閾值,該閾值要比平均值高一個標准偏差。
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)
Threshold: 0.033656895
如果檢查測試集中異常示例的重構誤差,您會發現大多數重構誤差都比閾值大。通過更改閾值,可以調整分類器的精度和召回率 。
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)
plt.hist(test_loss, bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

如果重建誤差大於閾值,則將ECG歸類為異常。
def predict(model, data, threshold):
reconstructions = model(data)
loss = tf.keras.losses.mae(reconstructions, data)
return tf.math.less(loss, threshold)
def print_stats(predictions, labels):
print("Accuracy = {}".format(accuracy_score(labels, preds)))
print("Precision = {}".format(precision_score(labels, preds)))
print("Recall = {}".format(recall_score(labels, preds)))
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)
Accuracy = 0.943 Precision = 0.9921722113502935 Recall = 0.9053571428571429