利用Python對時間序列進行分類與聚類
原文地址
我在最近的工作中遇到了一個問題,問題是我需要根據銀行賬戶在一定時間內的使用信息對該賬戶在未來的一段時間是否會被銷戶進行預測。這是一個雙元值的分類問題,只有兩種可能,即會被銷戶和不會被銷戶。針對這個問題一般來說有兩種解決策略。
- 提取時間序列的統計學特征值,例如最大值,最小值,均值等。然后利目前常用的算法根據提取的特征進行分類,例如Naive Bayes, SVMs 等。
- k-NN方法。針對想要預測的時間序列,在訓練集中找一個跟它最相似的另外一個序列,然后利用找到的序列的輸出值作為原序列的預測值。
下面我會使用這兩種算法,運行並對比結果,然后找到最合適的算法。
找到相似數據
針對這個問題,一般會使用歐氏距離尋找相似,但這種方法存在很多問題。如下文給出的示例。如圖所示,有三個時間序列:

很明顯,序列ts1和ts2的相似度更高,ts3和其他兩個相比的差異性更大。為了驗證結果,可以通過編程求得這三個序列之間的歐氏距離d(ts1, ts2)和d(ts1,ts3)。下面編寫一個計算歐氏距離的函數:
def euclid_dist(t1,t2): return sqrt(sum((t1-t2)**2))
通過計算,結果是d(ts1,ts2)=26.9,d(ts1,ts3)=23.2。很明顯,與我們直覺判斷相悖。這就是利用歐氏距離標准來判定相似性存在的問題。為了解決這個問題,引入另一個方法——DTW。
動態時間規整(Dynamic Time Warping, DTW)
DTW可以針對兩個時間序列找到最優的非線定位(non-linear alignment)。定位之間的歐氏距離不太容易受到時間軸方向上的失真所造成的負面相似性測量的影響。但是我們也必須為這種方法付出代價,即DTW是所有用到的時間序列的數據數量的二次方。
DTW的工作方式如下。我們可以引入兩個時間序列Q和C,這兩個時間序列都擁有n個數據點,Q=q_1, q_2, \cdots, q_n和C=c_1, c_2, \cdots, c_n。首先我們用這兩個時間序列去構造一個n\times n的矩陣,這個矩陣中的第i,j^{th}項代表的是數據點q_i和c_j之間的歐氏距離。我們需要通過這個矩陣找到一個變量,通過該變量可以使所有的歐氏距離和最小。這個變量可以決定兩個時間序列之間的最優非線性定位。需要注意的是,對於其中一個時間序列上的數據點,它是有可能映射到另一條時間序列上的多個數據點。
我們可以把這個變量記作W,W = w_1,w_2,\cdots,w_n。其中每一個W代表的都是Q中的第i個點與C中的第j個點之間的距離,即w_k=(q_i-c_j)^2。
因為我們需要找到這個變量的最小值,即W^*=argmin_W(\sqrt{\sum_{k=1}^{K}w_k}),為了找到這個值我們需要通過動態的方法,特別是接下來的這個遞歸函數。\gamma(i,j)=d(q_i,c_j)+min(\gamma(i-1,j-1),\gamma(i-1,j),\gamma(i,j-1))。該算法可以通過以下的Python代碼實現。
def DTWDistance(s1, s2): DTW={} for i in range(len(s1)): DTW[(i, -1)] = float('inf') for i in range(len(s2)): DTW[(-1, i)] = float('inf') DTW[(-1, -1)] = 0 for i in range(len(s1)): for j in range(len(s2)): dist= (s1[i]-s2[j])**2 DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)]) return sqrt(DTW[len(s1)-1, len(s2)-1])
通過DTW計算,DTWDistance(ts1,ts2)=17.9,DTWDDistance(ts1,ts3)=21.5。正如我們所看到的,該結果與只利用歐氏距離算法得到的結果截然不同。現在,這個結果就符合我們的主觀腿斷了,即ts2相比於ts3與ts1更為相似。
提高DTW算法計算速度
DTW的復雜度O(nm)是與兩個時間序列的數據數量正相關的。如果兩個時間序列都含有大量數據,那么這種方法的計算時間會非常長。因此我們需要通過一些方法去提高計算速度。第一種方法是強制執行局部性約束。這種方法假設當i和j相距太遠,則q_i和c_j不需要匹配。這個閾值則由一個給定的窗口大小w決定。這種方法可以提高窗口內循環的速度。詳細代碼如下:
def DTWDistance(s1, s2, w): DTW={} w = max(w, abs(len(s1)-len(s2))) for i in range(-1,len(s1)): for j in range(-1,len(s2)): DTW[(i, j)] = float('inf') DTW[(-1, -1)] = 0 for i in range(len(s1)): for j in range(max(0, i-w), min(len(s2), i+w)): dist= (s1[i]-s2[j])**2 DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)]) return sqrt(DTW[len(s1)-1, len(s2)-1])
另一種方法是使用LB Keogh下界方法DTW的邊界。
LBKeogh(Q,C)=\sum_{i=1}^{n}(c_i-U_i)^2I(c_i>U_i)+(c_i-L_i)^2I(c_i < U_i)
U_i和L_i是時間序列Q的上下邊界,U_i=max(q_{i-r}:q_{i+r}),L_i=min(q_{i-r}:q_{i+r})。其中r是可達到的邊界,I(\cdot)是指示函數。該方法可以通過以下代碼實現。
def LB_Keogh(s1,s2,r): LB_sum=0 for ind,i in enumerate(s1): lower_bound=min(s2[(ind-r if ind-r>=0 else 0):(ind+r)]) upper_bound=max(s2[(ind-r if ind-r>=0 else 0):(ind+r)]) if i>upper_bound: LB_sum=LB_sum+(i-upper_bound)**2 elif i<lower_bound: LB_sum=LB_sum+(i-lower_bound)**2 return sqrt(LB_sum)
LB Keogh小界方法是線性的,而DTW則是復雜的二次方形式,這使得它在處理大量時間序列的時候非常有利。
分類與聚類
現在我們有了可靠的方法去判斷兩個時間序列是否相似,截下來便可以使用k-NN算法進行分類。根據經驗,最優解一般出現在k=1的時候。下面就利用DTW歐氏距離的1-NN算法。在該算法中,train是時間序列示例的訓練集,其中時間序列所屬的類被附加到時間序列的末尾。test是相應的測試集,它所屬於的類別就是我們想要預測的結果。在該算法中,對於測試集中的每一個時間序列,每一遍搜索必須遍歷訓練集中的所有點,從而可以找到最多的相似點。考慮到DTW算法是二次方的,計算過程會耗費非常長時間。我們可以通過LB Keogh下界方法來提高分類算法的計算速度。計算機運行LB Keogh的速度會比運行DTW的速度快很多。另外,當LBKeogh(Q,C)\leq DTW(Q,C)時,我們可以消除那些比當前最相似的時間序列不可能更相似的時間序列。這樣,我們就可以消除很多不必要的DTW計算過程。
from sklearn.metrics import classification_report def knn(train,test,w): preds=[] for ind,i in enumerate(test): min_dist=float('inf') closest_seq=[] #print ind for j in train: if LB_Keogh(i[:-1],j[:-1],5)<min_dist: dist=DTWDistance(i[:-1],j[:-1],w) if dist<min_dist: min_dist=dist closest_seq=j preds.append(closest_seq[-1]) return classification_report(test[:,-1],preds)
下面測試一批數據。設置窗口大小為4。另外,盡管這里使用了LB Keogh下界方法和局部性約束,計算過程仍然需要幾分鍾。
train = np.genfromtxt('datasets/train.csv', delimiter='\t') test = np.genfromtxt('datasets/test.csv', delimiter='\t') print knn(train,test,4)
運行結果如下

這種方法也可用於k-mean聚類。在這種算法中,簇的數量設置為apriori,相似的時間序列會被放在一起。
import random def k_means_clust(data,num_clust,num_iter,w=5): centroids=random.sample(data,num_clust) counter=0 for n in range(num_iter): counter+=1 print counter assignments={} #assign data points to clusters for ind,i in enumerate(data): min_dist=float('inf') closest_clust=None for c_ind,j in enumerate(centroids): if LB_Keogh(i,j,5)<min_dist: cur_dist=DTWDistance(i,j,w) if cur_dist<min_dist: min_dist=cur_dist closest_clust=c_ind if closest_clust in assignments: assignments[closest_clust].append(ind) else: assignments[closest_clust]=[] #recalculate centroids of clusters for key in assignments: clust_sum=0 for k in assignments[key]: clust_sum=clust_sum+data[k] centroids[key]=[m/len(assignments[key]) for m in clust_sum] return centroids
再用這種算法測試一下數據:
train = np.genfromtxt('datasets/train.csv', delimiter='\t') test = np.genfromtxt('datasets/test.csv', delimiter='\t') data=np.vstack((train[:,:-1],test[:,:-1])) import matplotlib.pylab as plt centroids=k_means_clust(data,4,10,4) for i in centroids: plt.plot(i) plt.show()
結果如圖

代碼
所有用到的代碼都可以在my gitHub repo找到。