- novelty detection:當訓練數據中沒有離群點,我們的目標是用訓練好的模型去檢測另外發現的新樣本
- outlier dection:當訓練數據中包含離群點,模型訓練時要匹配訓練數據的中心樣本,忽視訓練樣本中的其他異常點。
一、outlier dection
1.孤立森林(Isolation Forest)
iForest適用於連續數據(Continuous numerical data)的異常檢測,將異常定義為“容易被孤立的離群點(more likely to be separated)可以理解為分布稀疏且離密度高的群體較遠的點。用統計學來解釋,在數據空間里面,分布稀疏的區域表示數據發生在此區域的概率很低,因此可以認為落在這些區域里的數據是異常的。通常用於網絡安全中的攻擊檢測和流量異常等分析,金融機構則用於挖掘出欺詐行為。對於找出的異常數據,然后要么直接清除異常數據,如數據清理中的去噪數據,要么深入分析異常數據,比如分析攻擊,欺詐的行為特征。
(1)算法
- 論文鏈接
- 非參數、無監督
- 大體思想:類似切蛋糕,切一次生成兩個子空間,之后再繼續用隨機超平面來切割每個子空間,循環,直到每個子空間里只有一個數據點為止。可以發現,密度很高的簇需要切很多次才能結束,而那些密度很低的點很容易很早停到一個子空間里。(參考這里)
- 算法的流程:參考
- 輸入:子采樣的樣本大小Ψ,樹的個數t,默認Ψ=256,t=100,height=8
-
步驟1:從訓練數據中隨機選擇Ψ個點樣本點作為subsample,放入樹的根節點。
-
步驟2:隨機指定一個維度(attribute),在當前節點數據中隨機產生一個切割點p——切割點產生於當前節點數據中指定維度的最大值和最小值之間。
-
步驟3:以此切割點生成了一個超平面,然后將當前節點數據空間划分為2個子空間:把指定維度里小於p的數據放在當前節點的左孩子,把大於等於p的數據放在當前節點的右孩子。
- 步驟4:在孩子節點中遞歸步驟2和3,不斷構造新的孩子節點,直到孩子節點中只有一個數據(無法再繼續切割) 或孩子節點已到達限定高度
- 獲得t個iTree之后,iForest 訓練就結束,然后我們可以用生成的iForest來評估測試數據了。對於一個訓練數據x,我們令其遍歷每一棵iTree,然后計算x最終落在每個樹第幾層(x在樹的高度)。然后我們可以得出x在每棵樹的高度平均值,即 the average path length over t iTrees。最后,將h(x)帶入,計算每條待測數據的異常分數(Anomaly Score),其計算公式為:
,其中
是二叉搜索樹的平均路徑長度,用來對結果進行歸一化處理, 其中的H(k)可以通過公式
來估計,
是歐拉常數,其值為0.5772156649。$h(x)$ 為路徑長度,$E(h(x))$ 為森林中所有iTree樹的平均路徑長度。獲得每個測試數據的average path length后,我們可以設置一個閾值(邊界值),average path length 低於此閾值的測試數據即為異常。論文中對樹的高度做了歸一化,並得出一個0到1的數值:
- 如果分數越接近1,其是異常點的可能性越高;
- 如果分數都比0.5要小,那么基本可以確定為正常數據;
- 如果所有分數都在0.5附近,那么數據不包含明顯的異常樣本。
- 步驟總結:
- 訓練:從訓練集中進行采樣,並構建iTree樹;
- 測試:對iForest森林中的每顆iTree樹進行測試,記錄path length,然后根據異常分數計算公式,計算每條測試數據的anomaly score。
- 適用場景:適用於連續特征,線性時間復雜度,適用於在線異常檢測
(2)實現
- 源碼文件
- 參數:
- n_estimators:默認=100,配置iTree樹的數量
- max_samples:默認=256,配置采樣大小
- max_features:默認=全部特征,對高維數據,可以只選取部分特征
- 參考示例代碼
舉例:
from sklearn.ensemble import IsolationForest iForest = IsolationForest(n_estimators=500,random_state=75,behaviour='new') iForest.fit(source_test[to_columns][:2000],) source_test['is_abnormal'] = iForest.predict(source_test[to_columns]) # 記錄異常樣本在原數據集中的索引 drop_rows = [] for row in source_test.itertuples(index=True, name='Pandas'): if getattr(row,'is_abnormal') == -1: drop_rows.append(row._asdict()) print(len(drop_rows)) # 查看要刪除的行的具體信息 drop_rows_dataframe = pd.DataFrame(drop_rows) drop_rows_dataframe[['signalStrength','signalQuality','is_abnormal','label']] # 查看異常與正常樣本的數量 source_test['is_abnormal'].value_counts()
(3)總結
- iForest具有線性時間復雜度,因為是ensemble的方法,所以可以用在含有海量數據的數據集上面,通常樹的數量越多,算法越穩定。由於每棵樹都是相互獨立生成的,因此可以部署在大規模分布式系統上來加速運算。
- iForest不適用於特別高維的數據。由於每次切數據空間都是隨機選取一個維度,建完樹后仍然有大量的維度信息沒有被使用,導致算法可靠性降低。高維空間還可能存在大量噪音維度或者無關維度(irrelevant attributes),影響樹的構建。對這類數據,建議使用子空間異常檢測(Subspace Anomaly Detection)技術。此外,切割平面默認是axis-parallel的,也可以隨機生成各種角度的切割平面。
- IForest僅對Global Anomaly敏感,即全局稀疏點敏感,不擅長處理局部的相對稀疏點(Local Anomaly)。
- iForest推動了重心估計(Mass Estimation)理論,目前在分類聚類和異常檢測中都取得顯著效果。
2.Local Outlier Factor
Local Outlier Factor(LOF)是基於密度的經典算法(Breuning et. al. 2000)。在 LOF 之前的異常檢測算法大多是基於統計方法的,或者是借用了一些聚類算法用於異常點的識別(比如 ,DBSCAN,OPTICS)。但是,基於統計的異常檢測算法通常需要假設數據服從特定的概率分布,這個假設往往是不成立的。而聚類的方法通常只能給出 0/1 的判斷(即:是不是異常點),不能量化每個數據點的異常程度。相比較而言,基於密度的LOF算法要更簡單、直觀。它不需要對數據的分布做太多要求,還能量化每個數據點的異常程度(outlierness)。
二、novelty detection
1.One-Class SVM
它的訓練集不應該摻雜異常點,因為模型可能會去匹配這些異常點。但在數據維度很高,或者對相關數據分布沒有任何假設的情況下,OneClassSVM也可以作為一種很好的outlier detection方法。在one-class classification中,僅僅只有一類的信息是可以用於訓練,其他類別的(總稱outlier)信息是缺失的,也就是區分兩個類別的邊界線是通過僅有的一類數據的信息學習得到的。
(1)算法
- 無監督學習
- 思想:SVDD,期望最小化超球體的體積,從而最小化異常點數據的影響。
- 適用於小樣本、高緯度、非線性問題
假設產生的超球體參數為中心$o$和對應的超球體半徑$r >0$,超球體體積$V(r)$被最小化;跟傳統SVM方法相似,可以要求所有訓練數據點$x_i$到中心的距離嚴格小於$r$。但是同時構造一個懲罰系數為$C$的松弛變量$ζi$,優化問題如下所示:
采用拉格朗日對偶求解之后,可以判斷新的數據點$z$是否在內,如果$z$到中心的距離小於或者等於半徑$r$,則不是異常點,如果在超球體以外,則是異常點。在Sklearn中,我們可以采用SVM包里面的OneClassSVM來做異常點檢測。OneClassSVM也支持核函數,所以普通SVM里面的調參思路在這里也使用。
SVDD的優化目標:求一個中心為a,半徑為R的最小球面
約束條件:
滿足這個條件就是說要把training set中的數據點都包在球面里。
(2)實現
class sklearn.svm.OneClassSVM(kernel=’rbf’, degree=3, gamma=’auto’, coef0=0.0, tol=0.001, nu=0.5, shrinking=True, cache_size=200, verbose=False, max_iter=-1, random_state=None)
參數:
- kernel:核函數(一般使用高斯核)
- nu:設定訓練誤差(0, 1],表示異常點比例,默認值為0.5
舉例:
import numpy as np import matplotlib.pyplot as plt import matplotlib.font_manager from sklearn import svm xx, yy = np.meshgrid(np.linspace(-5, 5, 500), np.linspace(-5, 5, 500)) # Generate train data X = 0.3 * np.random.randn(100, 2) X_train = np.r_[X + 2.1, X - 2.1] X_test = np.r_[X + 2, X - 2] # Generate some abnormal novel observations X_outliers = np.random.uniform(low=0.1, high=4, size=(20, 2)) # fit the model clf = svm.OneClassSVM(nu=0.1, kernel='rbf', gamma=0.1) clf.fit(X_train) y_pred_train = clf.predict(X_train) y_pred_test = clf.predict(X_test) y_pred_outliers = clf.predict(X_outliers) n_error_train = y_pred_train[y_pred_train == -1].size n_error_test = y_pred_test[y_pred_test == -1].size n_error_outlier = y_pred_outliers[y_pred_outliers == 1].size # plot the line , the points, and the nearest vectors to the plane Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()]) Z = Z.reshape(xx.shape) plt.title("Novelty Detection") # 填充等高線圖 plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.PuBu) # 繪制等高線 a = plt.contour(xx, yy, Z, levels=[0, Z.max()], colors='palevioletred') plt.clabel(a, inline_spacing=3, fmt='%.2f', fontsize=10) s = 40 b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='green', s=s, edgecolors='k') b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c='blueviolet', s=s, edgecolors='k') c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='gold', s=s, edgecolors='k') plt.axis('tight') plt.xlim((-5, 5)) plt.ylim((-5, 5)) plt.legend([a.collections[0], b1, b2, c], ["learned frontier", 'training observations', "new regular observations", "new abnormal observations"], loc="upper left", prop=matplotlib.font_manager.FontProperties(size=11)) plt.xlabel("error train: %d/200; errors novel regular: %d/40; errors novel abnormal:%d/40" % (n_error_train, n_error_test, n_error_outlier)) plt.show()
參考文獻:
【1】異常檢測學習資源
【4】Python機器學習筆記——One Class SVM