一、簡介
在對地理信息數據處理時,常常會遇到對線進行平滑和簡化的操作。線的平滑可以使用擬合或插值來完成。多段線(Polyline)簡化算法可以幫助我們減少Polyline的點數,從而降低輸入規模。對多段線簡化算法,通常的做法是在一定的近似精度下,刪除一些點或者邊。
二、線的平滑
插值和擬合有所不同。插值就是根據原有數據進行填充,最后生成的曲線一定過原有點。擬合是通過原有數據,調整曲線系數,使得曲線與已知點集的差別(最小二乘)最小,最后生成的曲線不一定經過原有點。
1 插值
實現所需的庫
numpy、scipy、matplotlib
方法
- nearest:最鄰近插值法
- zero:階梯插值
- slinear:線性插值
- quadratic、cubic:2、3階B樣條曲線插值
代碼實現
# -*- coding: utf-8 -*- # 調用模塊 # 調用數組模塊 import numpy as np # 實現插值的模塊 from scipy import interpolate # 畫圖的模塊 import matplotlib.pyplot as plt # 生成隨機數的模塊 import random # random.randint(0, 10) 生成0-10范圍內的一個整型數 # y是一個數組里面有10個隨機數,表示y軸的值 y = np.array([random.randint(0, 10) for _ in range(10)]) # x是一個數組,表示x軸的值 x = np.array([num for num in range(10)]) # 插值法之后的x軸值,表示從0到9間距為0.5的18個數 xnew = np.arange(0, 9, 0.5) """ kind方法: nearest、zero、slinear、quadratic、cubic 實現函數func """ func = interpolate.interp1d(x, y, kind='cubic') # 利用xnew和func函數生成ynew,xnew的數量等於ynew數量 ynew = func(xnew) # 畫圖部分 # 原圖 plt.plot(x, y, 'ro-') # 擬合之后的平滑曲線圖 plt.plot(xnew, ynew) plt.show()
注意事項
- x, y為原來的數據(少量)
- xnew為一個數組,條件:x⊆⊆xnew
- 如:x的最小值為-2.931,最大值為10.312;則xnew的左邊界要小於-2.931,右邊界要大於10.312。當然也最好注意一下間距,最好小於x中的精度
- func為函數,里面的參數x、y、kind,x,y就是原數據的x,y,kind為需要指定的方法。
- ynew需要通過xnew數組和func函數來生成
- 理論上xnew數組內的值越多,生成的曲線越平滑
2 擬合
SciPy函數庫在NumPy庫的基礎上增加了眾多的數學、科學以及工程計算中常用的庫函數。例如線性代數、常微分方程數值求解、信號處理、圖像處理、稀疏矩陣等等。提供了基於數組是算法級應用 矩陣運算,線性代數 最優化方法,聚類 空間運算,快速傅里葉變換。
import scipy as sp import matplotlib.pyplot as plt data = sp.genfromtxt('data/web_traffic.tsv',delimiter="\t") # print(data.shape)# 讀取數組長度 x= data[:,0]#訓練數據集 y= data[:,1] #輸出數據 sp.sum(sp.isnan(y))# 顯示無效值 x = x[~sp.isnan(y)] # 對數組取反 只選擇合法項 y = y[~sp.isnan(y)] #最小二乘法函數 def error(f, x, y): return sp.sum((f(x) - y) ** 2) fp1, res, rank, sv, rcond = sp.polyfit(x, y, 1, full=True) #生成一階曲線 #根據數據生成N階曲線 f1=sp.poly1d(fp1) f2p =sp.polyfit(x,y,2) # 2階曲線 f2=sp.poly1d(f2p) 通過matplotlib畫出數據的擬合曲線 plt.scatter(x,y) plt.xlabel(u"時間") plt.ylabel(u"點擊/小時") plt.xticks( [w * 7 * 24 for w in range(10)], [u'周 %i' % w for w in range(10)]) plt.autoscale(tight=True) fx=sp.linspace(0,x[-1],1000) plt.plot(fx,f3(fx),linewidth=4) plt.legend(["d=%i" %f1.order], loc="upper left") plt.grid plt.show()
三、線的簡化
實踐中,根據不同場景的需要,有下面幾種簡化算法通常比較有用
- 臨近點簡化(Radial Distance)
- 垂直距離簡化(Perpendicular Distance)
- Douglas-Peucker算法
1 臨近點簡化
該算法是一個簡單的O(n)復雜度的暴力算法。對某個頂點,接下來連續的頂點與該點(key)的距離如果都在給定的誤差范圍內,那么將移除這些點。過程如下圖:
我們通常保留Polyline的起始點和終止點作為結果的一部分。首先將起始點標記為Key,然后沿着Polyline前進,與Key相鄰的連續頂點到Key的距離如果小於給定誤差就移除,遇到的第一個超過誤差的點標記為下一個Key;從這個新的key開始重復上面的過程,知道到達之后一個頂點。
2 垂直距離簡化
臨近點算法將點-點距離作為誤差判據,而垂直距離簡化則是將點-線段的距離作為誤差判據。對於每個頂點Vi,需要計算它與線段[Vi-1, Vi+1]的垂直距離,距離比給定誤差小的點都將被移除。過程如下圖所示:
如上圖,首先,對前三個頂點進行處理,計算第二個頂點的垂直距離,將這個距離與容差進行比較,大於設定的誤差,所以第二個頂點作為Key保留(簡化結果的一部分)。然后算法開始處理接下來的三個頂點。這一次,計算的距離低於公差,因此中間頂點被刪除。如此重復,直到處理完所有頂點。
Note:對於每個被移除的頂點Vi,下一個可能被移除的候選頂點是Vi+2。這意味着原始多段線的點數最多只能減少50%。為了獲得更高的頂點減少率,需要多次遍歷。
3 Douglas-Peucker算法
Douglas-Peucker算法使用點-邊距離作為誤差衡量標准。該算法從連接原始Polyline的第一個和最后一個頂點的邊開始,計算所有中間頂點到邊的距離,距離該邊最遠的頂點,如果其距離大於指定的公差,將被標記為Key並添加到簡化結果中。這個過程將對當前簡化中的每條邊進行遞歸,直到原始Polyline的所有頂點與當前考察的邊的距離都在允許誤差范圍內。該過程如下圖所示:
初始時,簡化結果只有一條邊(起點-終點)。第一步中,將第四個頂點標記為一個Key,並相應地加入到簡化結果中;第二步,處理當前結果中的第一條邊,到該邊的最大頂點距離低於容差,因此不添加任何點;在第三步中,當前簡化的第二個邊找到了一個Key(點到邊的距離大於容差)。這條邊在這個Key處被分割,這個新的Key添加到簡化結果中。這個過程一直繼續下去,直到找不到新的Key為止。注意,在每個步驟中,只處理當前簡化結果的一條邊。
這個算法的最壞時間復雜度是O(nm), 平均時間復雜度 O(n log m),其中m是簡化后的Polyline的點數。因此,該算法是Output-sensitive的。當m很小時,該算法會很快。
DP算法的實現(沒親自實驗,先總結再說):
- 樣例代碼1
#-*- coding:utf-8 -*- """ 道格拉斯算法的實現 程序需要安裝shapely模塊 """ import math from shapely import wkt,geometry import matplotlib.pyplot as plt class Point: """點類""" x=0.0 y=0.0 index=0 #點在線上的索引 def __init__(self,x,y,index): self.x=x self.y=y self.index=index class Douglas: """道格拉斯算法類""" points=[] D=1 #容差 def readPoint(self): """生成點要素""" g=wkt.loads("LINESTRING(1 4,2 3,4 2,6 6,7 7,8 6,9 5,10 10)") coords=g.coords for i in range(len(coords)): self.points.append(Point(coords[i][0],coords[i][1],i)) def compress(self,p1,p2): """具體的抽稀算法""" swichvalue=False #一般式直線方程系數 A*x+B*y+C=0,利用點斜式,分母可以省略約區 #A=(p1.y-p2.y)/math.sqrt(math.pow(p1.y-p2.y,2)+math.pow(p1.x-p2.x,2)) A=(p1.y-p2.y) #B=(p2.x-p1.x)/math.sqrt(math.pow(p1.y-p2.y,2)+math.pow(p1.x-p2.x,2)) B=(p2.x-p1.x) #C=(p1.x*p2.y-p2.x*p1.y)/math.sqrt(math.pow(p1.y-p2.y,2)+math.pow(p1.x-p2.x,2)) C=(p1.x*p2.y-p2.x*p1.y) m=self.points.index(p1) n=self.points.index(p2) distance=[] middle=None if(n==m+1): return #計算中間點到直線的距離 for i in range(m+1,n): d=abs(A*self.points[i].x+B*self.points[i].y+C)/math.sqrt(math.pow(A,2)+math.pow(B,2)) distance.append(d) dmax=max(distance) if dmax>self.D: swichvalue=True else: swichvalue=False if(not swichvalue): for i in range(m+1,n): del self.points[i] else: for i in range(m+1,n): if(abs(A*self.points[i].x+B*self.points[i].y+C)/math.sqrt(math.pow(A,2)+math.pow(B,2))==dmax): middle=self.points[i] self.compress(p1,middle) self.compress(middle,p2) def printPoint(self): """打印數據點""" for p in self.points: print "%d,%f,%f"%(p.index,p.x,p.y) def main(): """測試""" #p=Point(20,20,1) #print '%d,%d,%d'%(p.x,p.x,p.index) d=Douglas() d.readPoint() #d.printPoint() #結果圖形的繪制,抽稀之前繪制 fig=plt.figure() a1=fig.add_subplot(121) dx=[] dy=[] for i in range(len(d.points)): dx.append(d.points[i].x) dy.append(d.points[i].y) a1.plot(dx,dy,color='g',linestyle='-',marker='+') d.compress(d.points[0],d.points[len(d.points)-1]) #抽稀之后繪制 dx1=[] dy1=[] a2=fig.add_subplot(122) for p in d.points: dx1.append(p.x) dy1.append(p.y) a2.plot(dx1,dy1,color='r',linestyle='-',marker='+') #print "========================\n" #d.printPoint() plt.show() if __name__=='__main__': main()
結果:
- 樣例代碼2
遞歸方式實現的Douglas–Peucker算法用於GPS軌跡壓縮,參考的原文找不到了,把它封裝成一個類。
import math class Point(object): def __init__(self, id, x, y): self.id = id self.x = x self.y = y class DPCompress(object): def __init__(self, pointList, tolerance): self.Compressed = list() self.pointList = pointList self.tolerance = tolerance self.runDP(pointList, tolerance) def calc_height(self, point1, point2, point): """ 計算point到[point1, point2所在直線]的距離 點到直線距離: A = point2.y - point1.y; B = point1.x - point2.x; C = point2.x * point1.y - point1.x * point2.y Dist = abs((A * point3.X + B * point3.Y + C) / sqrt(A * A + B * B)) """ # tops2 = abs(point1.x * point2.y + point2.x * point.y # + point.x * point1.y - point2.x * point1.y - point.x * # point2.y - point1.x * point.y) tops = abs(point1.x * point.y + point2.x * point1.y + point.x * point2.y - point1.x * point2.y - point2.x * point.y - point.x * point1.y ) bottom = math.sqrt( math.pow(point2.y - point1.y, 2) + math.pow(point2.x - point1.x, 2) ) height = 100 * tops / bottom return height def DouglasPeucker(self, pointList, firsPoint, lastPoint, tolerance): """ 計算通過的內容 DP算法 :param pointList: 點列表 :param firsPoint: 第一個點 :param lastPoint: 最后一個點 :param tolerance: 容差 :return: """ maxDistance = 0.0 indexFarthest = 0 for i in range(firsPoint, lastPoint): distance = self.calc_height(pointList[firsPoint], pointList[lastPoint], pointList[i]) if (distance > maxDistance): maxDistance = distance indexFarthest = i # print('max_dis=', maxDistance) if maxDistance > tolerance and indexFarthest != 0: self.Compressed.append(pointList[indexFarthest]) self.DouglasPeucker(pointList, firsPoint, indexFarthest, tolerance) self.DouglasPeucker(pointList, indexFarthest, lastPoint, tolerance) def runDP(self, pointList, tolerance): """ 主要運行結果 :param pointList: Point 列表 :param tolerance: 值越小,壓縮后剩余的越多 :return: """ if pointList == None or pointList.__len__() < 3: return pointList firspoint = 0 lastPoint = len(pointList) - 1 self.Compressed.append(pointList[firspoint]) self.Compressed.append(pointList[lastPoint]) while (pointList[firspoint] == pointList[lastPoint]): lastPoint -= 1 self.DouglasPeucker(pointList, firspoint, lastPoint, tolerance) def getCompressed(self): self.Compressed.sort(key=lambda point: int(point.id)) return self.Compressed
該類的使用方法:
import pandas as pd import numpy as np import collections from DPCompress import Point,DPCompress def load_data(file_path): columns=['rid','car_id','lon','lat'] df = pd.read_csv(file_path, header=None, names=columns) df_data = df.loc[df['lon']!=np.nan].reset_index().drop(['index'],axis=1) car_to_points = collections.defaultdict(list) for i, row in df_data.iterrows(): row_id = row['rid'] car_id = row['car_id'] lon = float(row['lon']) lat = float(row['lat']) pt = Point(row_id, round(lon, 6), round(lat, 6)) # 構造Point對象 car_to_points[car_id].append(pt) return car_to_points if __name__ == '__main__': data_file='data/trajectory.csv' output_file='data/compressed.csv' car_points = load_data(data_file) # print(car_points.keys()) with open(output_file, 'w', encoding='utf-8') as fwriter: for car, PointList in car_points.items(): points = [] dp = DPCompress(PointList, 3.5) points = dp.getCompressed() for p in points: line = "{},{},{}".format(car, p.x, p.y) fwriter.write(line) fwriter.write("\n") print(car, len(PointList), '-->', len(points))
- 樣例代碼3
Douglas-Peucker 算法是矢量數據壓縮經典算法,算法的基本思想如下:
假設組成曲線的頂點集合為 P1、P2、…Pn,假設 P1、Pn為曲線的起始點和終止點,將其虛連成一條直線, 計算曲線內點 Pi(i=2,3,…,n-1)到直線 P1Pn的距離 Di,通過比較距離的大小得到距離最大對應的點 Pk, 判斷 Dk的值與預先給定的閾值之間的大小關系。若小於閾值,則舍去曲線上的全部中間頂點;反之,若大於閾值,則保留點 Pk,並以該點為界限,將首尾兩點分別與該點虛連成一條直線,形成兩條新的直線段 P1Pk 和 PkPn,再重復上述的步驟確定下一批壓縮后的保留點。並以此類推,直到兩端點之間的曲線上的頂點到兩端點虛連成的直線的最大距離小於閾值為止。
遞歸實現思路:
1、計算出直線的方程。
2、遍歷各點到直線的距離,保存最大距離與其點的索引,判斷其與閾值的關系
a.若小於閾值:則該直線作為壓縮后的數據。
b.若大於閾值,則該點將直線分為兩段,AC CB
3、重復2的操作。
遞歸偽代碼:
def rdp(points, epsilon): dmax = 0.0 index = 0 for i in range(1, len(points) - 1): d = point_line_distance(points[i], points[0], points[-1]) if d > dmax: index = i dmax = d if dmax >= epsilon: results = rdp(points[:index+1], epsilon)[:-1] + rdp(points[index:], epsilon) else: results = [points[0], points[-1]] return results
非遞歸思路:
首先不使用遞歸,但是要做到將每個點都遍歷一遍,選出最大距離的點,要對兩端再進行遍歷。參考文章作者提供了思路,采用“出棧入棧的操作”來實現非遞歸算法。
依舊以上面的圖作為例子來講解
Step1:
建立兩個空棧,將首尾兩個端點分別放入AB兩個棧中。
Step2:
從直線中找到最大距離的點dmax(判斷其與閾值的關系,若小於閾值,則直線為壓縮后的數據),其索引為point3,將其放入棧B中。
此時,直線的端點從18 變為了 13
我們再次執行Step2的操作,所找到的最大距離就是13直線之間的最大距離了。
Step3:
若13之間的點的最大距離小於閾值,則B的頂端,出棧,將其放入A棧中。
之后在進行Step2的操作,所求的就是直線38之間的點。
直到最后,將B棧中的所有點都壓入A中,算法結束。A棧中的點即為最終的點。
python偽代碼
def RDP(Points,threshold): RDPFinout = [] if(len(Points)<=1): RDPFinout = Points return RDPFinout # 讀的數組的長度 IndexsFin = len(Points) # 創建兩個數組記錄索引 A = Stack() B = Stack() Maxindex = IndexsFin - 1 # 將首位兩個點坐標入棧 A.push(0) B.push(Maxindex) ############# while B棧不為空: maxDistanceIndex = findMaxdistanceIndex(Points, A.top(), B.top()) maxDistance = findMaxdistance(Points, A.top(), B.top()) if(maxDistance > threshold): #大於閾值 B.push(maxDistanceIndex) else: #小於閾值 A.push(B.pop()) ############### while A棧不為空: #輸出結果,存放到數組中 RDPFinout.append(Points[A.pop()]) return RDPFinout
原圖:
壓縮后:
閉合圖形如何壓縮?
先刪除尾點,壓縮完成后恢復