時間序列相似性度量方法
時間序列相似性度量常用方法為歐氏距離ED(Euclidean distance)和動態時間規整DTW(Dynamic Time Warping)。總體被分為兩類: 鎖步度量(lock-step measures) 和彈性度量(elastic measures) 。鎖步度量是時間序列進行 “一對一”的比 較; 彈性度量允許時間序列進行 “一對多”的比較。
歐氏距離屬於鎖步度量。

在時間序列中,我們通常需要比較兩端音頻的差異。而兩段音頻的長度大部分是不相等的。在語音處理領域上表現為不同人的語速不同。即時同一個人不同一時刻發同一個音,也不可能具有完全相同的時間長度。而且每個人對同一個單詞的不同音素的發音速度也是不同的,有的人會把"A"這個音拖得稍長,或者"i"稍短。在這種復雜的情況下,利用傳統的歐幾里得距離是無法求得有效的兩個時間序列之間的相似性的(即距離)。
如下圖,實線波形的a點會對應於虛線波形的b’點,這樣傳統的通過比較距離來計算相似性很明顯不靠譜。因為很明顯,實線的a點對應虛線的b點才是正確的。

考慮序列長短不一,或時間步不對齊的時候,特別是在峰值的時候,歐氏距離是無法有效計算兩個時間序列的距離。DTW算法將其中一個序列進行線性放縮、進行某種“扭曲”操作,。可以存在一對多mapping的情況,在時間序列探索過程中效果更好。適用於復雜時間序列,屬於彈性度量

dtw算法介紹
假設我們有兩個時間序列Q和C,他們的長度分別是n和m:(實際時間序列匹配運用中,一個序列為參考模板,一個序列為測試模板)
Q = q1, q2,…,qi,…, qn ;
C = c1, c2,…, cj,…, cm ;
為了對齊這兩個序列,我們需要構造一個n x m的矩陣網格,矩陣元素(i, j)表示qi和cj兩個點的距離d(qi, cj)(也就是序列Q的每一個點和C的每一個點之間的相似度,距離越小則相似度越高。這里先不管順序),一般采用歐式距離,d(qi, cj)= (qi-cj)2(也可以理解為失真度)。每一個矩陣元素(i, j)表示點qi和cj的對齊。DP算法可以歸結為尋找一條通過此網格中若干格點的路徑,路徑通過的格點即為兩個序列進行計算的對齊的點。
那么這條路徑我們怎么找到呢?那條路徑才是最好的呢?也就是剛才那個問題,怎么樣的warping才是最好的。
首先,這條路徑不是隨意選擇的,需要滿足以下幾個約束:
1)邊界條件:w1=(1, 1)和wK=(m, n)。任何一種語音的發音快慢都有可能變化,但是其各部分的先后次序不可能改變,因此所選的路徑必定是從左下角出發,在右上角結束。
2)連續性:如果wk-1= (a’, b’),那么對於路徑的下一個點wk=(a, b)需要滿足 (a-a’) <=1和 (b-b’) <=1。也就是不可能跨過某個點去匹配,只能和自己相鄰的點對齊。這樣可以保證Q和C中的每個坐標都在W中出現。
3)單調性:如果wk-1= (a’, b’),那么對於路徑的下一個點wk=(a, b)需要滿足0<=(a-a’)和0<= (b-b’)。這限制W上面的點必須是隨着時間單調進行的。以保證圖B中的虛線不會相交。
結合連續性和單調性約束,每一個格點的路徑就只有三個方向了。例如如果路徑已經通過了格點(i, j),那么下一個通過的格點只可能是下列三種情況之一:(i+1, j),(i, j+1)或者(i+1, j+1)。
滿足上面這些約束條件的路徑可以有指數個,最佳路徑是使得沿路徑的積累距離達到最小值這條路徑。這條路徑可以通過動態規划(dynamic programming)算法得到。具體搜索或者求解過程的直觀例子解釋可以參考:http://www.cnblogs.com/tornadomeet/archive/2012/03/23/2413363.html
普通dtw方法引入和使用
第一步:dtw包安裝,以Anaconda為例
從開始菜單打開 Anaconda prompt 管理工具
輸入dwt安裝腳本
pip install dtw
第二步:dtw包導入
from dtw import dtw
第三步:dtw函數使用
首先,需要定義距離函數,其中,x為參照序列數據,y為測試序列數據
distance=lambda x,y :np.abs(x-y)
接下來,調用dtw方法
d,cost_matrix,acc_cost_matrix,path=dtw(x,y,distance)
dtw函數輸入參數說明
x:參照序列數據
y:測試序列數據
distance:距離函數
dtw函數輸出參數說明
d:累計距離
cost_matrix:兩個序列之間的兩兩距離
acc_cost_matrix:dp矩陣
path:最佳路徑,包含兩個數組,path[0] 參照序列下標值,path[1] 測試序列下標值
軌跡線打印出圖
plt.imshow(cost_matrix.T,origin='lower',cmap='gray')
plt.plot(path[0],path[1]) plt.show()
dtw-python包的安裝與引用
dtw庫的使用限制太多,不夠靈活,且作圖不夠方便,主要體現運算量大、首尾必須匹配、序列間對應個數無法限定等。dtw-python包是R語言中dtw實現的python版本,基本的API是對應的,它的優勢在於能夠自定義點的匹配模式,約束條件,和滑動窗口。同時提供方便的作圖和快速的計算(C語言的內核),官方文檔點擊這里。
dtw-python 包安裝腳本
pip install dtw-python
dtw-python 包引用
from dtw import dtw,dtwPlot,stepPattern,warp,warpArea,window #不建議直接 from dtw import * 方式,避免沖突
由於在dtw包選用過程中,先安裝了dtw包,后安裝了dtw-python包,在dtw-python引用過程中,出現dtw包引用失敗情況,建議兩者保留其一。可使用包卸載腳本對dtw包進行卸載,並在Anaconda安裝目錄下 \Anaconda3\Lib 目錄下清除掉 以 ~ 開頭命名的文件夾
pip uninstall dtw
dtw函數的參數說明
ds=dtw(x, y=None, dist_method='euclidean', step_pattern='symmetric2', window_type=None, window_args={}, keep_internals=False, distance_only=False, open_end=False, open_begin=False)
輸入參數說明:
dist_method 定義你用的距離方法,默認為’euclidean’,即歐幾里得距離。
keep_internals=True保存內部的信息包含距離矩陣之類的,一般我們把它設為True。
step_pattern定義了點之間的匹配模式,有好幾種,具體查看官網(StepPattern — The dtw-python package 1.3.0 documentation (dynamictimewarping.github.io))。
window_type表示全局條件約束,也有幾種模式,同查看官網(dtw — The dtw-python package 1.3.0 documentation (dynamictimewarping.github.io))。"none", "itakura", "sakoechiba", "slantedband",其中sakoechiba方式需要在window_args 參數中傳遞window_size參數,方式如下:window_args= {"window_size": 1}
distance_only如果設置為True,會加速計算,只計算距離,忽略匹配函數。
open_end和open_begin設為True表示無拘束的匹配,即完全的部分匹配,默認是全局匹配,就是嚴格對應頭和尾。
輸出參數說明:
ds.distance:累計距離
ds.index1:參照序列下標值
ds.index2:測試序列下標值
dtw輸出結果打印
ds.plot(type="twoway",offset=-2)
一般有twoway和threeway兩種模式,下面詳解,offset=-2可以理解為兩根線之間的分離程度,為了方便看清可以設的大一些。下圖左側為twoway方式的輸出結果,右側為threeway方式的輸出結果。


dtw函數使用注意事項
1.數據中不能出現空置和無效值,使用上一記錄替代缺失數據時,同一時段內缺失數據不能過多,否則會造成變化趨勢匹配結果偏差較大
2.最大值和最小值可能不會做到正確匹配
dtw使用實例:上下游河流斷面污染物濃度變化趨勢相似度識別
背景:同一條河流上兩個斷面的污染物濃度在變化趨勢上呈現一定的相似性。但由於河流流速的不定,上游斷面濃度的波峰波谷在下游斷面上出現的時間間隔不同,可以通過dtw方法實現兩斷面間相似變化趨勢的時間段,以及上下游斷面間污染物濃度變化的相互關系
第一步:添加python包引用
import numpy as np import pandas as pd import matplotlib.pylab as plt import math import datetime import time #用於時間秒轉換成時間 from dtw import dtw,dtwPlot,stepPattern,warp,warpArea,window #不建議直接 from dtw import * 方式,避免沖突 import sys import pymssql import xlwt from pandas._libs.tslibs.timestamps import Timestamp
第二步:通過csv方式讀取兩個斷面污染物的濃度監測數值
beginTime='2019/1/2 12:00' endTime='2019/2/18 12:00' #從csv文件中取出數據 df=pd.read_csv('wq3.csv',encoding='utf-8', index_col='datatime') #從csv文件中讀取時間序列數據,index_col列定義為索引對象 df.index=pd.to_datetime(df.index) df_xy_org=df['xyvalue'][beginTime:endTime] #指定時間序列中下游的監測數據列 df_sy_org=df['syvalue'] #指定時間序列中上游的監測數據列 #具體計算中需要排除掉缺數的情況,特別是長時間的缺數,會影響到最終的匹配結果。通過使用上一次監測數據方法不合適 for i in range(0,len(df_xy_org)): if not df_xy_org[i] or math.isnan(df_xy_org[i]): #判斷數值是否為空 df_xy_org[i] = df_xy_org[i-1] for i in range(0,len(df_sy_org)): if not df_sy_org[i] or math.isnan(df_sy_org[i]): #判斷數值是否為空 df_sy_org[i] = df_sy_org[i-1]
第三步:為了更有效的找到波峰和波谷,對污染物濃度做一階差分處理
#比對數據做一階差分處理 df_xytime=np.empty(len(df_xy_org)-1,dtype=datetime.datetime) df_xyvalue=np.empty(len(df_xy_org)-1) for i in range(1,len(df_xy_org)-1): df_xytime[i-1]=df_xy_org.index[i].to_pydatetime() df_xyvalue[i-1]=(float(df_xy_org.values[i])-float(df_xy_org.values[i-1])) df_xy=pd.Series(df_xyvalue,index=df_xytime) df_sytime=np.empty(len(df_sy_org)-1,dtype=datetime.datetime) df_syvalue=np.empty(len(df_sy_org)-1) for i in range(1,len(df_sy_org)-1): df_sytime[i-1]=df_sy_org.index[i].to_pydatetime() df_syvalue[i-1]=(float(df_sy_org.values[i])-float(df_sy_org.values[i-1])) df_sy=pd.Series(df_syvalue,index=df_sytime)
第四步:定義距離函數
#定義距離計算方法 dist=lambda x,y:np.abs(np.power(100,x)-np.power(100,y))
第五步:下游斷面監測數據作為參照序列,對上游斷面監測數據中選取【參照序列長度正負2的長度】的序列,循環驗證最相似的時間序列
#dwt計算,比對dist生成的結果中,累計距離最小的項作為最終的輸出結果 lunci=20 bestBegin=datetime.datetime.strptime(beginTime, '%Y/%m/%d %H:%M') bestEnd=datetime.datetime.strptime(endTime, '%Y/%m/%d %H:%M') bestCost=sys.float_info.max bestds='undefined' #生成待比對斷面的監測數據,長度上邊可以比下游斷面的監測數據長度多或者少兩條記錄(-2,-1,0,1,2),要求首尾必須匹配方式 diff=np.array([-2,-1,0,1,2],dtype=np.int) for lendiff in diff: bestLunci=-1 for ii in range(lunci): delta_begin=datetime.timedelta(hours=(-4)*(lunci-ii)) delte_end=datetime.timedelta(hours=(-4)*(lunci-ii-int(lendiff))) try: tempbegin=datetime.datetime.strptime(beginTime, '%Y/%m/%d %H:%M')+delta_begin tempend=datetime.datetime.strptime(endTime, '%Y/%m/%d %H:%M')+delte_end tempdf_sy=df_sy[tempbegin:tempend] ds=dtw(df_xy,tempdf_sy,dist_method=dist,keep_internals=True,step_pattern='asymmetric',window_type='sakoechiba',open_begin=False,open_end=False,window_args= {"window_size": 1}) #參數傳遞方法 print('運行輪次:'+str(ii)+'時 ds.distance='+str(ds.distance)) if bestCost>ds.distance: bestCost=ds.distance bestds=ds bestBegin=tempbegin bestEnd=tempend bestLunci=ii bestds.plot(type="twoway") print(bestdsT.distance) print('運行輪次:'+str(ii)+'成功!') except Exception as e: print('運行輪次:'+str(ii)+'出錯!') print(e)
第六步:找到原始斷面污染物濃度值,並找出上下游不同時刻點的對應關系情況,輸出csv文件
#記錄上下游污染物濃度數據到一個數組中 records=pd.DataFrame(columns=['syvalue','xyvalue']) #把dtw運行結果存入csv文件中 #創建文件 file = xlwt.Workbook() #寫入sheet名稱,2017-05-01T06_00_00 格式 name_sheet='上下游水質關系' table = file.add_sheet(name_sheet) title=['df_xytime','df_sytime','df_xyvalue','df_syvalue','timediff'] bestdf_sy=df_sy[bestBegin:bestEnd] #table.write(行_0開始, 列_0開始, 值) #寫入表頭,第一行 for j in range(len(title)): table.write(0, j, title[j]) #index_i+1 行,index_j列 for index_i,r in enumerate(bestds.index1): try: df_xytime=datetime.datetime.strptime(beginTime, '%Y/%m/%d %H:%M')+datetime.timedelta(hours=4*1) +datetime.timedelta(hours=4*index_i) #beginTime先加一個小時的原因是,前期執行了一個一階差分,時間上往后延遲了一個周期,而bestBeginTime不存在這種問題 df_sytime=bestBegin+datetime.timedelta(hours=4*int(bestds.index2[index_i])) df_xyvalue=df_xy_org[df_xytime] df_syvalue=df_sy_org[df_sytime] timediff=(df_xytime-df_sytime).days*24+(df_xytime-df_sytime).seconds/3600 record=pd.DataFrame([[df_syvalue,df_xyvalue]],columns=['syvalue','xyvalue'])#注意插入的數據包含有兩級中括號 records=records.append(record,ignore_index=True) table.write(index_i+1,0,df_xytime.strftime('%Y-%m-%d %H:%M:%S')) table.write(index_i+1,1,df_sytime.strftime('%Y-%m-%d %H:%M:%S')) table.write(index_i+1,2,df_xyvalue) table.write(index_i+1,3,df_syvalue) table.write(index_i+1,4,timediff) except: continue try: #保存文件 name_file=name_sheet+'.csv' file.save(name_file) except Exception as e: print (e) #找到原始數據,繪制關系對應圖 print('records')
測試數據: https://files.cnblogs.com/files/HoEn/dtw%E6%B5%8B%E8%AF%95%E6%95%B0%E6%8D%AE.zip?t=1676453291
參考資料:
語音信號處理之(一)動態時間規整(DTW)_zouxy09的專欄-CSDN博客_動態時間規整
時間序列匹配之dtw的python實現(二)_輕舟已過萬重山的博客-CSDN博客_dtw python 用法
