前言
用python實現了一個沒有庫依賴的“純” py-based PrefixSpan算法。
首先對韓老提出的這個數據挖掘算法不清楚的可以看下這個博客,講解非常細致。我的實現也是基本照着這個思路。
再簡單提一下這個算法做了一件什么事。
假設有多個時間序列串:
串序號 | 序列串 |
---|---|
0 | 1, 4, 2, 3 |
1 | 0, 1, 2, 3 |
2 | 1, 2, 1, 3, 4 |
3 | 2, 1, 2, 4, 4 |
4 | 1, 1, 1, 2, 3 |
查看上面的5條記錄串,可以發現 (1,2,3) 這個子序列頻繁出現,那么這很有可能就是所有串中潛在的一種序列模式。舉個病毒攻擊的例子來說,所有設備在一天內遭受到的攻擊序列有一個公共子序列(攻擊類型,攻擊發起者ip),那么這種子序列很有可能是同一個黑客組織發起的一次大規模的攻擊,PrefixSpan就是用來高效地檢測出這種潛在的序列模式。
1. 代碼概覽
整個算法實現大概120來行代碼,關鍵函數就4個,結構如下:
|__PrefixSpan(...) # PrefixSpan
|__createC1(...) # 生成初始前綴集合
|__rmLowSup(...) # 刪除初始前綴集合中的低support集
|__psGen(...) # 生成新的候選前綴集
|__genNewPostfixDic(..) # 根據候選集生成新的后綴集合
2. 實現細節
假設我們的數據集長這樣(對應上述表格):
D = [
[1,4,2,3],
[0, 1, 2, 3],
[1, 2, 1, 3, 4],
[2, 1, 2, 4, 4],
[1, 1, 1, 2, 3],
]
其中每條數據表示一個序列。
算法流程大致如下:
# 生成初始前綴集合和初始后綴集合
L1, postfixDic = createC1(D, minSup)
# 定義結果集 L,放入初始后綴集和
L = [], k = 2
L.append(L1)
# 前綴不斷增長1,生成新的前綴,當新的前綴集合大小=0的時候,循環退出
while len(L[k-2]) > 0:
# 生成新的候選前綴集合(長度比之前的大1)
Lk = psGen()
# 根據前綴更新后綴集和
posfixDic = genNewPostfixDic()
# 加入到結果集中
L.append(Lk)
k+=1
2.1 創建初始前綴集合
首先來看下createC1
的代碼清單:
def createC1(D, minSup):
'''生成第一個候選序列,即長度為1的集合序列
'''
C1 = []
postfixDic={}
lenD = len(D)
for i in range(lenD):
for idx, item in enumerate(D[i]):
if tuple([item]) not in C1:
postfixDic[tuple([item])]={}
C1.append(tuple([item]))
if i not in postfixDic[tuple([item])].keys():
postfixDic[tuple([item])][i]=idx
L1, postfixDic = rmLowSup(D, C1, postfixDic, minSup)
return L1, postfixDic
參數:
- D:數據集
- minSup: PrefixSpan算法的關鍵參數min_support
返回值:
- L1:剔除低support集后的候選前綴集合
- postfixDic: 對應候選集合的后綴集
前綴集合C1
初始前綴集合包含只含單個元素的集合,在調用rmLowSup
方法前,上述代碼的初始前綴集合C1
的結果為:[(0,),(1,),(2),(3,),(4,)]
(其中每個前綴用tuple的形式,主要是為了能夠hash);
后綴集合postfixDic
postfixDic
是前綴集合C1
的后綴,它是一個Python字典,每個元素表示當前前綴在數據集中某一條序列中最早出現的結尾位置(這樣處理,后續訪問后綴的時候,就不需要從頭開始遍歷了),例如運行完上述代碼后:
postfixDic[(1,)]={0:0, 1:1, 2:0, 3:1, 4:0}
回顧數據集D,可以發現1在每一行都出現了,且在第一行(下標為0)出現的結尾為0,第二行位置為1... (位置從0開始)
依次類推:
postfixDic[(1,2,3)]={0:3, 1:3, 2:3, 4:4}
表示前綴 (1,2,3)
在第 0,1,2,4 行都出現了,在第一行的結尾為3,第二行為3...
同時我們可以發現調用 len(postfixDic[prefix])
就可以知道前綴prefix
在多少序列中出現了,據此可以刪除低support 前綴
刪除低support前綴
rmLowSup
函數清單如下:
def rmLowSup(D,Cx, postfixDic,minSup):
'''
根據當前候選集合刪除低support的候選集
'''
Lx = Cx
for iset in Cx:
if len(postfixDic[iset])/len(D) < minSup:
Lx.remove(iset)
postfixDic.pop(iset)
return Lx, postfixDic
根據后綴集和postfixDic
的說明,前綴prefix
的支持度為: len(postfixDic[prefix])/len(D)
, 例如上述前綴(1,2,3)
的支持度為 4/5=0.8,低於閾值minSup
的前綴和其相應在postfixDic
中的key將被剔除。
2.2 生成新的候選前綴集合
psGen
代碼清單如下:
def psGen(D, Lk, postfixDic, minSup, minConf):
'''生成長度+1的新的候選集合
'''
retList = []
lenD = len(D)
# 訪問每一個前綴
for Ck in Lk:
item_count = {} # 統計item在多少行的后綴里出現
# 訪問出現前綴的每一行
for i in postfixDic[Ck].keys():
# 從前綴開始訪問每個字符
item_exsit={}
for j in range(postfixDic[Ck][i]+1, len(D[i])):
if D[i][j] not in item_count.keys():
item_count[D[i][j]]=0
if D[i][j] not in item_exsit:
item_count[D[i][j]]+=1
item_exsit[D[i][j]]=True
c_items = []
# 根據minSup和minConf篩選候選字符
for item in item_count.keys():
if item_count[item]/lenD >= minSup and item_count[item]/len(postfixDic[Ck])>=minConf:
c_items.append(item)
# 將篩選后的字符組成新的候選前綴,加入候選前綴集合
for c_item in c_items:
retList.append(Ck+tuple([c_item]))
return retList
對於當前前綴集(長度相等)中的每個前綴,通過后綴集合postfixDic
能挖掘到其可能的下一個字符。例如前綴 (1,2)
的 postfixDic[(1,2)]={0:2, 1:2, 2:1, 3:2, 4:3}
, 表示在第0,1,2,3,4行都存在前綴(1,2)
, 通過其在每行的前綴結尾位置,例如第0行的結尾位置,可以在[postfixDic[(1,2)][0], len(D[0]))
范圍內查找是否有符合條件的新元素,即第0行的 [2, 4) 范圍內搜索。
具體方法是統計后綴中不同元素分別在當前行是否出現,再統計它們出現的行數,查找過程如下表所示(對應函數清單的前半部分):
查找行/元素候選 | 0 | 1 | 2 | 3 | 4 |
---|---|---|---|---|---|
0 | 0 | 0 | 0 | 1 | 0 |
1 | 0 | 0 | 0 | 1 | 0 |
2 | 0 | 1 | 0 | 1 | 1 |
3 | 0 | 0 | 1 | 0 | 1 |
4 | 0 | 0 | 0 | 1 | 0 |
總計 | 0 | 1 | 1 | 4 | 2 |
可以看到候選元素3,4分別出現了4次和2次,則表示候選前綴 (1,2,3)
和(1,2,4)
在5行序列中的4行和2行中出現,可以很快計算得到它們的support值為0.8和0.5。
傳統的PrefixSpan只在這里用min_support的策略過濾候選前綴集,而代碼里同時用了min_confidence 參數,這里就不細講了。
2.3 更新后綴集和postfixDic
同樣來看下代碼清單:
def genNewPostfixDic(D,Lk, prePost):
'''根據候選集生成相應的PrefixSpan后綴
參數:
D:數據集
Lk: 選出的集合
prePost: 上一個前綴集合的后綴
基於假設:
(1,2)的后綴只能出現在 (1,)的后綴列表里
e.g.
postifixDic[(1,2)]={1:2,2:3} 前綴 (1,2) 在第一行的結尾是2,第二行的結尾是3
'''
postfixDic = {}
for Ck in Lk:
# (1,2)的后綴只能出現在 (1,)的后綴列表里
postfixDic[Ck]={}
tgt = Ck[-1]
prePostList = prePost[Ck[:-1]]
for r_i in prePostList.keys():
for c_i in range(prePostList[r_i]+1, len(D[r_i])):
if D[r_i][c_i]==tgt:
postfixDic[Ck][r_i] = c_i
break
return postfixDic
現在我們要根據新的候選前綴集合更新后綴集和postfixDic
,為此我們需要舊的前綴集合 postfixDic
作為輔助,可以大大減小時間復雜度。
例如我們更新前綴 (1,2,3)
的后綴,我們不需要再從 D的第0行開始遍歷所有的序列。
因為(1,2,3)
必然來自前綴 (1,2)
,因此只要遍歷出現前綴 (1,2)
的行進行查找即可,這也是我們需要舊的前綴集合的原因。
接下來就簡單了,只要在這些行,找到新的元素的位置即可。例如對於前綴 (1,2)
,其后綴postfixDic[(1,2)]={0: 2, 1: 2, 2: 1, 3: 2, 4: 3}
,所以在0,1,2,3,4行的這些位置+1開始尋找是否存在3這個元素。上述代碼做的就是這個。
以此類推我們就可以得到所有符合條件的子序列。