數據集中的異常數據通常被成為異常點、離群點或孤立點等,典型特征是這些數據的特征或規則與大多數數據不一致,呈現出“異常”的特點,而檢測這些數據的方法被稱為異常檢測。
異常數據根據原始數據集的不同可以分為離群點檢測和新奇檢測:
- 離群點檢測(Outlier Detection)
大多數情況我們定義的異常數據都屬於離群點檢測,對這些數據訓練完之后再在新的數據集中尋找異常點。
- 新奇檢測(Novelty Detection)
所謂新奇檢測是識別新的或未知數據模式和規律的檢測方法,這些規律和只是在已有機器學習系統的訓練集中沒有被發掘出來。新奇檢測的前提是已知訓練數據集是“純凈”的,未被真正的“噪音”數據或真實的“離群點”污染,然后針對這些數據訓練完成之后再對新的數據做訓練以尋找新奇數據的模式。
新奇檢測主要應用於新的模式、主題、趨勢的探索和識別,包括信號處理、計算機視覺、模式識別、智能機器人等技術方向,應用領域例如潛在疾病的探索、新物種的發現、新傳播主題的獲取等。
新奇檢測和異常檢測有關,一開始的新奇點往往都以一種離群的方式出現在數據中,這種離群方式一般會被認為是離群點,因此二者的檢測和識別模式非常類似。但是,當經過一段時間之后,新奇數據一旦被證實為正常模式,例如將新的疾病識別為一種普通疾病,那么新奇模式將被合並到正常模式之中,就不再屬於異常點的范疇。
異常檢測的適用場景:
- 常用於異常訂單識別、風險客戶預警、黃牛識別、貸款風險識別、欺詐檢測、技術入侵、制造業產品異常檢測,數據中心機器異常檢測等針對個體的分析場景
- 類別嚴重不平衡;
- 無標簽輸出,或負樣本成本太高;
- 一般歸一化之前做,或者只取四分位數段的數據做縮放。
注意點:
- 如果訓練樣本中異常樣本的比例比較高,違背了異常檢測的基本假設,可能最終的效果會受到影響;
- 異常檢測根具體的應用場景緊密相關,算法檢測出的“異常”不一定是我們實際想要的,比如,在識別虛假交易時,異常的交易未必就是虛假的交易。所以,在特征選擇時,可能需要過濾不太相關的特征,以免識別出一些不太相關的“異常”。
常見的異常檢測方法:
- 基於統計:該方法的基本步驟是對數據點進行建模,再以假定的模型(如泊松分布、正太分布等)根據點的分布來確定是否異常。一般通過數據變異指標來發現異常數據。常用變異指標有極差、四分位數間距、均差、標准差、變異系數等。但是,基於統計的方法檢測出來的異常點產生機制可能不唯一,而且它在很大程度上依賴於待挖掘的數據集是否滿足某種概率分布模型,另外模型的參數、離群點的數目等都非常重要,確定這些因素通常都比較困難。因此,實際情況中算法的應用性和可移植性較差。
- 基於聚類:K-means(如果到集群質心的距離高於閾值或者最近集群的大小低於閾值,則將數據點定義為異常)
- 基於距離:knn(具有大k-最近鄰距離的數據點被定義為異常)
- 基於密度:LOF(local outlier factor)(將密度大大低於鄰居的樣本視為異常值),BIRCH,DBSCAN(如果數據點的局部區域內的數據點的數量低於閾值,則將其定義為異常)
- 專門異常點檢測:隔離森林(較大高度平均值為異常值),one-class SVM(與超球體中心的距離大於r為新奇)
- 基於偏差:PCA/自編碼器(具有高重建誤差的數據點被定義為異常),隱馬爾可夫模型(HMM)
iForest 小結:
- 論文提到采樣大小超過256效果就提升不大了,並且越大還會造成計算時間上的浪費。100棵樹,采樣大小256。
- iForest具有線性時間復雜度,因為是ensemble的方法,所以可以用在含有海量數據的數據集上面,通常樹的數量越多,算法越穩定。由於每棵樹都是相互獨立生成的,因此可以部署在大規模分布式系統上來加速運算。
- iForest不適用於特別高維的數據,推薦降維后使用。由於每次切數據空間都是隨機選取一個維度,建完樹后仍然有大量的維度信息沒有被使用,導致算法可靠性降低。高維空間還可能存在大量噪音維度或者無關維度(irrelevant attributes),影響樹的構建。對這類數據,建議使用子空間異常檢測(Subspace Anomaly Detection)技術。此外,切割平面默認是axis-parallel的,也可以隨機生成各種角度的切割平面。
- IForest僅對Global Anomaly敏感,即全局稀疏點敏感,不擅長處理局部的相對稀疏點(Local Anomaly)。
- 而One Class SVM對於中小型數據分析,尤其是訓練樣本不是特別海量的時候用起來經常會比IForest順手,因此比較適合做原型分析。
- iForest推動了重心估計(Mass Estimation)理論,目前在分類聚類和異常檢測中都取得顯著效果。
構造 iForest 的步驟如下:
- 1. 從訓練數據中隨機選擇 n 個點樣本作為subsample,放入樹的根節點。
- 2. 隨機指定一個維度(attribute),在當前節點數據中隨機產生一個切割點p——切割點產生於當前節點數據中指定維度的最大值和最小值之間。
- 3. 以此切割點生成了一個超平面,然后將當前節點數據空間划分為2個子空間:把指定維度里面小於p的數據放在當前節點的左孩子,把大於等於p的數據放在當前節點的右孩子。
- 4. 在孩子節點中遞歸步驟2和3,不斷構造新的孩子節點,知道孩子節點中只有一個數據(無法再繼續切割)或者孩子節點已達限定高度。
獲得 t個iTree之后,iForest訓練就結束,然后我們可以用生成的iForest來評估測試數據了。對於一個訓練數據X,我們令其遍歷每一顆iTree,然后計算X 最終落在每個樹第幾層(X在樹的高度)。然后我們可以得到X在每棵樹的高度平均值,即 the average path length over t iTrees
import numpy as np import matplotlib.pyplot as plt from sklearn.ensemble import IsolationForest rng = np.random.RandomState(42) # Generate train data X = 0.3 * rng.randn(100, 2) #rng.uniform(0,1,(100,2)) X_train = np.r_[X + 2, X - 2] #行拼接 X = 0.3 * rng.randn(20, 2) X_test = np.r_[X + 2, X - 2] X_outliers = rng.uniform(low=-4, high=4, size=(20, 2)) # fit the model clf = IsolationForest(behaviour='new', max_samples=100, random_state=rng, contamination='auto') clf.fit(X_train) y_pred_train = clf.predict(X_train) y_pred_test = clf.predict(X_outliers) xx, yy = np.meshgrid(np.linspace(-5, 5, 50), np.linspace(-5, 5, 50)) Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()]) Z = Z.reshape(xx.shape) plt.title("IsolationForest") plt.contourf(xx, yy, Z) #cmap=plt.cm.Blues_r b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k') b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c='green', s=20, edgecolor='k') c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='red', s=20, edgecolor='k') plt.axis('tight') plt.xlim((-5, 5)) plt.ylim((-5, 5)) plt.legend([b1, b2, c], ["training observations", "new regular observations", "new abnormal observations"], loc="upper left") plt.show()
OneClassSVM
One Class Learning 比較經典的算法是One-Class-SVM,這個算法的思路非常簡單,就是尋找一個超平面將樣本中的正例圈出來,預測就是用這個超平面做決策,在圈內的樣本就認為是正樣本。由於核函數計算比較耗時,在海量數據的場景用的並不多;
嚴格來說,OneCLassSVM不是一種outlier detection,而是一種novelty detection方法:它的訓練集不應該摻雜異常點,因為模型可能會去匹配這些異常點。但在數據維度很高,或者對相關數據分布沒有任何假設的情況下,OneClassSVM也可以作為一種很好的outlier detection方法。
假設產生的超球體參數為中心 o 和對應的超球體半徑 r >0,超球體體積V(r) 被最小化,中心 o 是支持行了的線性組合;跟傳統SVM方法相似,可以要求所有訓練數據點xi到中心的距離嚴格小於r。但是同時構造一個懲罰系數為 C 的松弛變量 ζi ,優化問題入下所示:
- 當訓練集未被異常值污染時,該估計器最適合於新穎性檢測;
- 高維中的離群檢測,或者對上層數據的分布沒有任何假設。
import numpy as np import matplotlib.pyplot as plt import matplotlib.font_manager from sklearn import svm # Generate train data X = 0.3 * np.random.randn(100, 2) X_train = np.r_[X + 2, X - 2] X_test = np.r_[X + 2, X-2] X_outliers = np.random.uniform(low=0.1, high=4, size=(20, 2)) #data = np.loadtxt('https://raw.githubusercontent.com/ffzs/dataset/master/outlier.txt', delimiter=' ') #X_train = data[:900, :] #X_test = data[-100:, :] # 模型擬合 '''class sklearn.svm.OneClassSVM(kernel='rbf', degree=3, gamma=0.0, coef0=0.0, tol=0.001, nu=0.5, shrinking=True, cache_size=200, verbose=False, max_iter=-1, random_state=None)''' clf = svm.OneClassSVM(nu=0.1, kernel='rbf', gamma=0.1) clf.fit(X_train) y_pred_train = clf.predict(X_train) y_pred_test = clf.predict(X_test) y_pred_outliers = clf.predict(X_outliers) print ("novelty detection result:", y_pred_test) n_error_train = y_pred_train[y_pred_train == -1].size n_error_test = y_pred_test[y_pred_test == -1].size n_error_outlier = y_pred_outliers[y_pred_outliers == 1].size # plot the line , the points, and the nearest vectors to the plane # 在平面中繪制點、線和距離平面最近的向量 #rand_mat = np.random.uniform(-1.0,1.0,(10000,5)) #np.random.randn(10000, 5) #for i in range(X_train.shape[1]): # rand_mat[:,i] = (rand_mat[:,i]-X_train[:,i].mean()) * X_train[:,i].std() #rand_mat = pd.DataFrame(rand_mat, columns=list('abcde')) #rand_mat = rand_mat.sort_values(by = ['a','b']) #rand_mat = rand_mat.values #Z = clf.decision_function(rand_mat) #xx = rand_mat[:,0].reshape((100,100)) #yy = rand_mat[:,1].reshape((100,100)) #xx, yy = np.meshgrid(np.linspace(X_train[:,0].min(), X_train[:,0].max(), 100), # np.linspace(X_train[:,1].min(), X_train[:,1].max(), 100)) #Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel(), X_train[np.random.randint(900,size=10000),2:]]) xx, yy = np.meshgrid(np.linspace(-5, 5, 500), np.linspace(-5, 5, 500)) Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()]) Z = Z.reshape(xx.shape) plt.title("Novelty Detection") plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.PuBu) a = plt.contour(xx, yy, Z, levels=[0, Z.max()], colors='palevioletred') s = 40 b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=s, edgecolors='k') b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c='blueviolet', s=s, edgecolors='k') c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='gold', s=s, edgecolors='k') plt.axis('tight') plt.xlim((-5, 5)) plt.ylim((-5, 5)) plt.legend([a.collections[0], b1, b2, c], ["learned frontier", 'training observations', "new regular observations", "new abnormal observations"], loc="upper left", prop=matplotlib.font_manager.FontProperties(size=11)) plt.xlabel( "error train: %d/200; errors novel regular: %d/40; errors novel abnormal:%d/40"%( n_error_train, n_error_test, n_error_outlier) ) plt.show()
實際數據測試oneClassSVM
import numpy as np import matplotlib.pyplot as plt from numpy import genfromtxt from sklearn import svm #plt.style.use('fivethirtyeight') def read_dataset(filePath, delimiter=','): return genfromtxt(filePath, delimiter=delimiter) # use the same dataset #tr_data = read_dataset('tr_data.csv') tr_data = np.loadtxt('https://raw.githubusercontent.com/ffzs/dataset/master/outlier.txt', delimiter=' ') ''' OneClassSVM(cache_size=200, coef0=0.0, degree=3, gamma=0.1, kernel='rbf', max_iter=-1, nu=0.05, random_state=None, shrinking=True, tol=0.001, verbose=False) ''' clf = svm.OneClassSVM(nu=0.05, kernel='rbf', gamma=0.1) clf.fit(tr_data[:,:2]) pred = clf.predict(tr_data[:,:2]) # inliers are labeled 1 , outliers are labeled -1 normal = tr_data[pred == 1] abnormal = tr_data[pred == -1] plt.plot(normal[:, 0], normal[:, 1], 'bx') plt.plot(abnormal[:, 0], abnormal[:, 1], 'ro')
圖片異常值檢測OneClassSVM
import os import cv2 from PIL import Image def get_files(file_dir): for file in os.listdir(file_dir + '/AA475_B25'): A.append(file_dir + '/AA475_B25/' + file) length_A = len(os.listdir(file_dir + '/AA475_B25')) for file in range(length_A): img = Image.open(A[file]) new_img = img.resize((128, 128)) new_img = new_img.convert("L") matrix_img = np.asarray(new_img) AA.append(matrix_img.flatten()) images1 = np.matrix(AA) return images1
多種異常檢測算法的比較
import time import numpy as np import matplotlib import matplotlib.pyplot as plt from sklearn import svm from sklearn.datasets import make_blobs, make_moons from sklearn.covariance import EllipticEnvelope from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor matplotlib.rcParams['contour.negative_linestyle'] = 'solid' # Example settings n_samples = 300 outliers_fraction = 0.15 n_outliers = int(outliers_fraction * n_samples) n_inliers = n_samples - n_outliers # define outlier/ anomaly detection methods to be compared anomaly_algorithms = [ ("Robust covariance", EllipticEnvelope(contamination=outliers_fraction)), ("One-Class SVM", svm.OneClassSVM(nu=outliers_fraction, kernel='rbf',gamma=0.1)), ("Isolation Forest", IsolationForest(behaviour='new', contamination=outliers_fraction, random_state=42)), ("Local Outlier Factor", LocalOutlierFactor(n_neighbors=35, contamination=outliers_fraction)) ] # define datasets blobs_params = dict(random_state=0, n_samples=n_inliers, n_features=2) datasets = [ make_blobs(centers=[[0, 0], [0, 0]], cluster_std=0.5, **blobs_params)[0], make_blobs(centers=[[2, 2], [-2, -2]], cluster_std=[0.5, 0.5], **blobs_params)[0], make_blobs(centers=[[2, 2], [-2, -2]], cluster_std=[1.5, 0.3], **blobs_params)[0], 4. * (make_moons(n_samples=n_samples, noise=0.05, random_state=0)[0] - np.array([0.5, 0.25])), 14. * (np.random.RandomState(42).rand(n_samples, 2) - 0.5) ] # Compare given classifiers under given settings xx, yy = np.meshgrid(np.linspace(-7, 7, 150), np.linspace(-7, 7, 150)) plt.figure(figsize=(len(anomaly_algorithms) * 2 + 3, 12.5)) plt.subplots_adjust(left=0.02, right=0.98, bottom=0.001, top=0.96, wspace=0.05, hspace=0.01) plot_num = 1 rng = np.random.RandomState(42) for i_dataset, X in enumerate(datasets): # add outliers X = np.concatenate([X, rng.uniform(low=-6, high=6, size=(n_outliers, 2))], axis=0) for name, algorithm in anomaly_algorithms: print(name , algorithm) t0 = time.time() algorithm.fit(X) t1 = time.time() plt.subplot(len(datasets), len(anomaly_algorithms), plot_num) if i_dataset == 0: plt.title(name, size=18) # fit the data and tag outliers if name == 'Local Outlier Factor': y_pred = algorithm.fit_predict(X) else: y_pred = algorithm.fit(X).predict(X) # plot the levels lines and the points if name != "Local Outlier Factor": Z = algorithm.predict(np.c_[xx.ravel(), yy.ravel()]) Z = Z.reshape(xx.shape) plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='black') colors = np.array(["#377eb8", '#ff7f00']) plt.scatter(X[:, 0], X[:, 1], s=10, color=colors[(y_pred + 1) // 2]) plt.xlim(-7, 7) plt.ylim(-7, 7) plt.xticks(()) plt.yticks(()) plt.text(0.99, 0.01, ('%.2fs' % (t1 - t0)).lstrip('0'), transform=plt.gca().transAxes, size=15, horizontalalignment='right') plot_num += 1 plt.show()
DBSCAN、BIRCH、LOF
# -*- coding: utf-8 -*- """ Created on Tue Jul 30 11:31:02 2019 @author: epsoft """ import time import numpy as np import pandas as pd from sqlalchemy import create_engine from sklearn.cluster import DBSCAN, Birch from sklearn.neighbors import LocalOutlierFactor from sklearn import metrics import matplotlib import matplotlib.pyplot as plt matplotlib.rcParams['font.sans-serif'] = ['SimHei'] matplotlib.rcParams['font.family']='sans-serif' matplotlib.rcParams['axes.unicode_minus'] = False # ############################################################################## # 獲取數據 #數據庫連接參數信息 config = { 'username':'root', 'passwd':'123', 'host':'127.0.0.1', 'port':'1521', 'sid':'ORCL' } engine = 'oracle://{username}:{passwd}@{host}:{port}/{sid}'.format(**config) #dbname -- 各版本語法不同 db = create_engine(engine, encoding='utf-8') sql = ''' select hicode, max(hiname) as hiname, count(*) as counts, sum(decode(sumfd_20,null,0,1))/count(*), --avg(sumfd_20), sum(decode(sumfd_21,null,0,1))/count(*), --avg(sumfd_21), sum(decode(sumfd_22,null,0,1))/count(*), --avg(sumfd_22), sum(decode(sumfd_23,null,0,1))/count(*), --avg(sumfd_23), sum(decode(sumfd_24,null,0,1))/count(*) --avg(sumfd_24) from LU_CONSUMABLE_MATERIAL t group by hicode having count(*) > 100 ''' data = pd.read_sql_query(sql, db) X = data.iloc[:,3:8].values # ############################################################################## # 參數設置 aa = [] for i in range(X.shape[0]-1): for j in range(i+1,X.shape[0]): aa.append(np.power(X[i]-X[j], 2).sum()) plt.hist(aa, bins=10, density=1, edgecolor ='k', facecolor='g', alpha=0.75) plt.show() # 調參 t0 = time.time() optimum_parameter = [0,0,0] for r in np.linspace(0.01, 0.1, 10): for min_samples in range(3,10): db = DBSCAN(eps=r, min_samples=min_samples).fit(X) score = metrics.silhouette_score(X, db.labels_) print('(%0.2f, %d) 輪廓系數: %0.3f'%(r, min_samples, score)) if score > optimum_parameter[2]: optimum_parameter=[r, min_samples, score] print('最佳參數為:eps=%0.2f, min_samples=%d, 輪廓系數=%0.3f'%(optimum_parameter[0], optimum_parameter[1], optimum_parameter[2])) print('調參耗時:', time.time()-t0) # ############################################################################## # 調用密度聚類 DBSCAN db = DBSCAN(eps=0.06, min_samples=5).fit(X) # print(db.labels_) # db.labels_為所有樣本的聚類索引,沒有聚類索引為-1 # print(db.core_sample_indices_) # 所有核心樣本的索引 # 獲取聚類個數。(聚類結果中-1表示沒有聚類為離散點) n_clusters_ = len(set(db.labels_)) - (1 if -1 in db.labels_ else 0) # 異常樣本標簽 data.loc[db.labels_ == -1, 'hiname'] # 調用 Birch算法 branching_factor = 5 bi = Birch(threshold=0.06, branching_factor=branching_factor, n_clusters = None) bi_outlier = bi.fit_predict(X) from collections import Counter aa = [i for i,c in Counter(bi_outlier).items() if c<branching_factor] outlier_flag = np.array([i in aa for i in bi_outlier]) data.loc[outlier_flag, 'hiname'] plt.scatter(X[:, 0], X[:, 2], c=bi_outlier) plt.show() # 調用 LOF 算法 lof = LocalOutlierFactor(n_neighbors=5, contamination=0.04) #contamination表示異常比例 lof_outlier = lof.fit_predict(X) X_scores = lof.negative_outlier_factor_ radius = (X_scores.max() - X_scores) / (X_scores.max() - X_scores.min()) data.loc[lof_outlier == -1, 'hiname'] plt.scatter(X[:, 0], X[:, 2], c=lof_outlier) #plt.plot(X[:, 0], X[:, 2], 'o') plt.show() plt.scatter(X[:, 0], X[:, 2], color='k', s=3., label='Data points') #plt.scatter(X[lof_outlier == -1, 0], X[lof_outlier == -1, 2], color='r', s=3., label='Outlier points') plt.scatter(X[lof_outlier > -1, 0], X[lof_outlier > -1, 2], s=1000 * radius[lof_outlier > -1], edgecolors='b', facecolors='none', label='inlier scores') plt.scatter(X[lof_outlier == -1, 0], X[lof_outlier == -1, 2], s=1000 * radius[lof_outlier == -1], edgecolors='r', facecolors='none', label='Outlier scores') plt.axis('tight') plt.xlim((min(X[:, 0])-0.05, max(X[:, 0])+0.05)) plt.ylim((min(X[:, 2])-0.05, max(X[:, 2])+0.05)) plt.title("Local Outlier Factor (LOF)") #plt.xlabel("prediction errors: %d" % (n_errors)) legend = plt.legend(loc='upper right') legend.legendHandles[0]._sizes = [10] legend.legendHandles[1]._sizes = [20] legend.legendHandles[2]._sizes = [20] plt.show() # ############################################################################## # 模型評估 print('估計的聚類個數為: %d' % n_clusters_) #print("同質性: %0.3f" % metrics.homogeneity_score(labels_true, db.labels_)) # 每個群集只包含單個類的成員。 #print("完整性: %0.3f" % metrics.completeness_score(labels_true, db.labels_)) # 給定類的所有成員都分配給同一個群集。 #print("V-measure: %0.3f" % metrics.v_measure_score(labels_true, db.labels_)) # 同質性和完整性的調和平均 #print("調整蘭德指數: %0.3f" % metrics.adjusted_rand_score(labels_true, db.labels_)) #print("調整互信息: %0.3f" % metrics.adjusted_mutual_info_score(labels_true, db.labels_)) #print("Fowlkes-Mallows: %0.3f" % metrics.fowlkes_mallows_score(labels_true, db.labels_)) print("輪廓系數: %0.3f" % metrics.silhouette_score(X, db.labels_, metric='euclidean')) print("Calinski-Harabasz分數: %0.3f" % metrics.calinski_harabaz_score(X, db.labels_)) # ############################################################################## # 聚類結果可視化 from itertools import cycle import matplotlib.pyplot as plt plt.close('all') plt.figure(figsize=(12,4)) plt.clf() unique_labels = set(db.labels_) core_samples_mask = np.zeros_like(db.labels_, dtype=bool) # 設置一個樣本個數長度的全false向量 core_samples_mask[db.core_sample_indices_] = True #將核心樣本部分設置為true # 使用黑色標注離散點 plt.subplot(121) colors = [plt.cm.Spectral(each) for each in np.linspace(0, 1, len(unique_labels))] for k, col in zip(unique_labels, colors): if k == -1: # 聚類結果為-1的樣本為離散點 # 使用黑色繪制離散點 col = [0, 0, 0, 1] class_member_mask = (db.labels_ == k) # 將所有屬於該聚類的樣本位置置為true xy = X[class_member_mask & core_samples_mask] # 將所有屬於該類的核心樣本取出,使用大圖標繪制 plt.plot(xy[:, 0], xy[:, 2], 'o', markerfacecolor=tuple(col),markeredgecolor='k', markersize=14) xy = X[class_member_mask & ~core_samples_mask] # 將所有屬於該類的非核心樣本取出,使用小圖標繪制 plt.plot(xy[:, 0], xy[:, 2], 'o', markerfacecolor=tuple(col),markeredgecolor='k', markersize=6) plt.title('對醫院醫療耗材的異常值檢測最佳聚類數: %d' % n_clusters_) plt.xlabel(r'CQ類材料使用頻率(%)') plt.ylabel(r'單價200元以上CL類使用頻率(%)') #plt.show() plt.subplot(122) colors = cycle('bgrcmybgrcmybgrcmybgrcmy') for k, col in zip(unique_labels, colors): class_member_mask = db.labels_ == k if k == -1: plt.plot(X[class_member_mask, 0], X[class_member_mask, 2], 'k' + '.') else: cluster_center = X[class_member_mask & core_samples_mask].mean(axis=0) plt.plot(X[class_member_mask, 0], X[class_member_mask, 2], col + '.') plt.plot(cluster_center[0], cluster_center[2], 'o', markerfacecolor=col, markeredgecolor='k', markersize=14) for x in X[class_member_mask]: plt.plot([cluster_center[0], x[0]], [cluster_center[2], x[2]], col) plt.title('Estimated number of clusters: %d' % n_clusters_) plt.xlabel(r'CQ類材料使用頻率(%)') plt.ylabel(r'單價200元以上CL類使用頻率(%)') plt.show()
模型評估格式化
def bench_k_means(estimator, name, data): t0 = time() estimator.fit(data) print('%-9s\t%.2fs\t%i\t%.3f\t%.3f\t%.3f\t%.3f\t%.3f\t%.3f' % (name, (time() - t0), estimator.inertia_, metrics.homogeneity_score(labels_true, estimator.labels_), metrics.completeness_score(labels_true, estimator.labels_), metrics.v_measure_score(labels_true, estimator.labels_), metrics.adjusted_rand_score(labels_true, estimator.labels_), metrics.adjusted_mutual_info_score(labels_true, estimator.labels_, average_method='arithmetic'), metrics.silhouette_score(data, estimator.labels_, metric='euclidean', sample_size=sample_size))) print(82 * '_') print('init\t\ttime\tinertia\thomo\tcompl\tv-meas\tARI\tAMI\tsilhouette') bench_k_means(KMeans(init='k-means++', n_clusters=n_digits, n_init=10), name="k-means++", data=data) bench_k_means(KMeans(init='random', n_clusters=n_digits, n_init=10), name="random", data=data) pca = PCA(n_components=n_digits).fit(data) bench_k_means(KMeans(init=pca.components_, n_clusters=n_digits, n_init=1), name="PCA-based", data=data) print(82 * '_')
自編碼器
如果我們只有正樣本數據,沒有負樣本數據,或者說只關注學習正樣本的規律,那么利用正樣本訓練一個自編碼器,編碼器就相當於單分類的模型,對全量數據進行預測時,通過比較輸入層和輸出層的相似度就可以判斷記錄是否屬於正樣本。由於自編碼采用神經網絡實現,可以用GPU來進行加速計算,因此比較適合海量數據的場景。
多元高斯分布
參考鏈接:
Python機器學習筆記 異常點檢測算法——Isolation Forest
TensorFlow中的變分自動編碼器 VAE的tensorflow實現
機器學習(八):AnomalyDetection異常檢測_Python