DTW算法可以用來衡量兩個時間序列的相似性,而且兩個時間序列的長度可以不必相等。
DTW算法原理
如圖1所示,圖中矩陣$dij$表示時間序列$A$時刻$i$和時間序列$B$時刻$j$的距離,DTW算法就是要從$(1,1)$到$(m,n)$找到一條路徑使得累計$dij$最小。
圖1:DTW算法示意圖,r表示warping window,紅色圓點表示累計距離最小的路徑。
如何找到這條路徑呢,我們這里采用動態規划算法。假設我們要求到位置$(i,j)$的最小累計距離$D(i,j)$,那么它只能由$D(i-1,j)$,$D(i,j-1)$和$D(i-1,j-1)$這三個位置的最小累計距離中尋找,也就是$D(i,j)=dij+min[D(i-1,j),D(i,j-1),D(i-1,j-1)]$。如圖2所示,最優路徑只能從三個臨近位置尋找。最終,$D(m,n)$越小,表示兩個時間序列間的相似性越高。
圖2:動態規划算法尋找最小累計距離路徑的三種可能
DTW算法的Python實現
def dtw_distance(ts_a, ts_b, d=lambda x,y: abs(x-y), mww=10000): """Computes dtw distance between two time series Args: ts_a: time series a ts_b: time series b d: distance function mww: max warping window, int, optional (default = infinity) Returns: dtw distance """
# Create cost matrix via broadcasting with large int
ts_a, ts_b = np.array(ts_a), np.array(ts_b) M, N = len(ts_a), len(ts_b) cost = np.ones((M, N)) # Initialize the first row and column
cost[0, 0] = d(ts_a[0], ts_b[0]) for i in range(1, M): cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0]) for j in range(1, N): cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j]) # Populate rest of cost matrix within window
for i in range(1, M): for j in range(max(1, i - mww), min(N, i + mww)): choices = cost[i-1, j-1], cost[i, j-1], cost[i-1, j] cost[i, j] = min(choices) + d(ts_a[i], ts_b[j]) # Return DTW distance given window
return cost[-1, -1]