DTW算法可以用来衡量两个时间序列的相似性,而且两个时间序列的长度可以不必相等。
DTW算法原理
如图1所示,图中矩阵$dij$表示时间序列$A$时刻$i$和时间序列$B$时刻$j$的距离,DTW算法就是要从$(1,1)$到$(m,n)$找到一条路径使得累计$dij$最小。
图1:DTW算法示意图,r表示warping window,红色圆点表示累计距离最小的路径。
如何找到这条路径呢,我们这里采用动态规划算法。假设我们要求到位置$(i,j)$的最小累计距离$D(i,j)$,那么它只能由$D(i-1,j)$,$D(i,j-1)$和$D(i-1,j-1)$这三个位置的最小累计距离中寻找,也就是$D(i,j)=dij+min[D(i-1,j),D(i,j-1),D(i-1,j-1)]$。如图2所示,最优路径只能从三个临近位置寻找。最终,$D(m,n)$越小,表示两个时间序列间的相似性越高。
图2:动态规划算法寻找最小累计距离路径的三种可能
DTW算法的Python实现
def dtw_distance(ts_a, ts_b, d=lambda x,y: abs(x-y), mww=10000): """Computes dtw distance between two time series Args: ts_a: time series a ts_b: time series b d: distance function mww: max warping window, int, optional (default = infinity) Returns: dtw distance """
# Create cost matrix via broadcasting with large int
ts_a, ts_b = np.array(ts_a), np.array(ts_b) M, N = len(ts_a), len(ts_b) cost = np.ones((M, N)) # Initialize the first row and column
cost[0, 0] = d(ts_a[0], ts_b[0]) for i in range(1, M): cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0]) for j in range(1, N): cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j]) # Populate rest of cost matrix within window
for i in range(1, M): for j in range(max(1, i - mww), min(N, i + mww)): choices = cost[i-1, j-1], cost[i, j-1], cost[i-1, j] cost[i, j] = min(choices) + d(ts_a[i], ts_b[j]) # Return DTW distance given window
return cost[-1, -1]