這里先列出 sklearn 官方給出的使用高斯核(RBF kernel) one class svm 實現二維數據的異常檢測:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.font_manager
from sklearn import svm
xx, yy = np.meshgrid(np.linspace(-5, 5, 500), np.linspace(-5, 5, 500))
# Generate train data
X = 0.3 * np.random.randn(100, 2)
X_train = np.r_[X + 2, X - 2]
# Generate some regular novel observations
X = 0.3 * np.random.randn(20, 2)
X_test = np.r_[X + 2, X - 2]
# Generate some abnormal novel observations
X_outliers = np.random.uniform(low=-4, high=4, size=(20, 2))
# fit the model
clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
clf.fit(X_train)
y_pred_train = clf.predict(X_train)
y_pred_test = clf.predict(X_test)
y_pred_outliers = clf.predict(X_outliers)
n_error_train = y_pred_train[y_pred_train == -1].size
n_error_test = y_pred_test[y_pred_test == -1].size
n_error_outliers = y_pred_outliers[y_pred_outliers == 1].size
# plot the line, the points, and the nearest vectors to the plane
Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.title("Novelty Detection")
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.PuBu) #繪制異常樣本的區域
a = plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='darkred') #繪制正常樣本和異常樣本的邊界
plt.contourf(xx, yy, Z, levels=[0, Z.max()], colors='palevioletred') #繪制正常樣本的區域
s = 40
b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=s, edgecolors='k')
b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c='blueviolet', s=s,
edgecolors='k')
c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='gold', s=s,
edgecolors='k')
plt.axis('tight')
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.legend([a.collections[0], b1, b2, c],
["learned frontier", "training observations",
"new regular observations", "new abnormal observations"],
loc="upper left",
prop=matplotlib.font_manager.FontProperties(size=11))
plt.xlabel(
"error train: %d/200 ; errors novel regular: %d/40 ; "
"errors novel abnormal: %d/40"
% (n_error_train, n_error_test, n_error_outliers))
plt.show()
效果如下圖:
下面簡單介紹一下 sklearn.svm.OneClassSVM
函數的用法:
decision_function(self, X)
點到分割超平面的有符號距離fit(self, X[, y, sample_weight])
訓練出樣本 X 的軟邊界fit_predict(self, X[, y])
訓練出樣本 X 的軟邊界后返回標簽(是否異常)get_params(self[, deep])
獲取估計器訓練參數predict(self, X)
返回樣本 X 的標簽
對於可視化圖像繪制的函數
matplotlib.pyplot.contour
和matplotlib.pyplot.contourf
可以繪制出等高線和填充等高線,兩個函數的參數和調用方式一樣。其中levels
代表了分割線的 list,以plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.PuBu)
這句話為例:表示的是繪制以xx
和yy
構成的二維平面,以Z
作為每個點的高程繪制等高線,從Z.min()
到 0 分成 7 份,當 Z 等於這 7 個值時繪制等高線。plt.cm.PuBu
代表一種顏色映射,具體的樣式見 Colormap reference。matplotlib.pyplot.scatter
繪制散點圖
下面是改編的代碼用於異常檢測:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pickle
import numpy as np
import pandas as pd
from math import ceil
import matplotlib.pyplot as plt
import matplotlib.font_manager
from sklearn import svm
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
def get_dataset_to_pandas(file_name, dropList=[]):
dataset = pd.read_csv(file_name)
for drop_str in dropList:
dataset = dataset.drop(drop_str,axis=1)
return dataset
def pre_scaler(dataset, type_str = "std"):
if type_str == "minmax":
scaler = MinMaxScaler()
elif type_str == "std":
scaler = StandardScaler()
else :
return None
scaler.fit(dataset)
return scaler,scaler.transform(dataset)
def train_test_split(dataset, test_ratio = 0.3, seed = 42):
if seed:
np.random.seed(seed)
shuffle_index = np.random.permutation(len(dataset))
test_size = ceil(len(dataset) * test_ratio)
test_index = shuffle_index[:test_size]
train_index = shuffle_index[test_size:]
dataset_train = dataset[train_index]
dataset_test = dataset[test_index]
return dataset_train, dataset_test
def variable_save(variable, file_name):
data_output = open(file_name, 'wb')
pickle.dump(variable,data_output)
data_output.close()
def variable_load(file_name):
data_input = open(file_name, 'rb')
variable = pickle.load(data_input)
data_input.close()
return variable
if __name__ == '__main__':
dataset = get_dataset_to_pandas("walk1.csv", ["Loss","TimeStamp","LT_Foot_TimeStamp","RT_Foot_TimeStamp",'Chest_TimeStamp'])
scaler, dataset = pre_scaler(dataset,"minmax")
X_train, X_test = train_test_split(dataset)
# fit the model
clf = svm.OneClassSVM(nu=0.05, kernel="rbf", gamma="auto")
clf.fit(X_train)
y_pred_train = clf.predict(X_train)
y_pred_test = clf.predict(X_test)
n_error_train = y_pred_train[y_pred_train == -1].size
n_error_test = y_pred_test[y_pred_test == -1].size
print(n_error_train,",",n_error_test)
# distances = clf.decision_function(dataset)
# save clf and scaler
# variable_save((clf,scaler),'./one_class_svm')
# (clf,scaler) = variable_load('./one_class_svm')
# print(clf,'\n',scaler)
在訓練完成之后可以通過 clf.decision_function
檢測與邊界的距離來判斷是否異常和 clf.predict
直接判斷是否是異常點。