arcpy模塊下的並行計算與大規模數據處理


一個多星期的時間,忍着胃痛一直在做GIS 540: Spatial Programming的課程項目,導致其他方面均毫無進展,可惜可惜。在這個過程當中臨時抱佛腳學習了很多Python相關的其他內容,並應用在這次的項目當中(雖然大部分的嘗試都失敗了,也有不少問題需要之后尋求解決的方法)。在此稍微總結下這幾天寫代碼的心得。

項目背景

這次的項目主要是基於Python 2.7版本下的arcpy模塊,調用其中的相關工具進行一系列的空間操作與數值計算,具體的內容則不便於在此透露。由於計算量過於龐大,因此嘗試着采用了multiprocessing模塊來進行多進程編程以加快計算速度。當然,主要也是為了便於調用工作站當中的多核資源。其他部分,os, sys, time模塊也均有使用,用以輔助相關的路徑獲取和計時。工作環境為Windows 10,采用Spyder作為編輯器,ArcGIS的版本為Desktop 10.5.1。

Python多進程計算

在項目當中所遇到的一個很大的問題就是基於Table的數據計算量太過於龐大,而Python的計算速度也是慘不忍睹。(其實是想着用其他語言來做的,無奈臨時抱佛腳嘛,哎)由於整個的計算過程當中有着非常多的重復的部分,故而考慮采用多進程、多線程的方法來提高計算速度。Python當中的多線程由於全局解釋鎖GIL(Global Intepreter Lock)的存在,其對於編程效率的提升並不是很明顯,因此這里僅僅涉及使用多進程的方法對原有的計算函數進行重構。有關於進程和線程的概念在這里便不多涉及(其實我自己也得復習下操作系統了,本科大三學的都忘得差不多了)。

另外補充一點,就是對於采用多進程多線程編程必要性的判斷。我記得有博客做過相應的分析,其結果是多進程對於計算密集型的任務效率提升較大,而多線程則是比較適用於I/O密集型的任務。

首先,抽象描述一個在我的項目當中較為典型的任務場景:
Table A 當中有一個Field稱為NumA,對於Table A當中每個對象的NumA均不相同。現在對於A當中的每一行(對應一個空間對象),從Table B當中選取在該行對象附近一定范圍內的所有行,並對Table B當中選中的行對應的Field B的值添加上A當中該對象的值。其抽象的代碼表述如下:

for i in Table A: Select rows in Table B by i for j in rows: j[NumB] += i[NumA] 

看似是很簡單的內容,但當Table A和Table B均有上萬行的時候,其計算所耗費的時間便難以想象(或者按照算法當中的理論,這里的時間復雜度是O(mn),m和n分別是單個外循環任務和內循環任務所對應的時間,而且因為主要是讀寫任務,沒法通過算法設計來對其進行改進,心酸)。而Select這個步驟因為其基於位置進行選擇的特性,在arcpy當中也是頗耗費時間的一個函數,這更加增大的計算時候的開銷(這將在之后進一步描述)。這里由於Select是針對於某一個表當中的各個不同的對象進行讀取操作,故而我們可以考慮為這一步驟開設多個進程以提高效率。

Python 2.7當中自帶的multiprocessing模塊可以提供已經封裝好的多進程操作對象與函數,包括Process(進程對象),Queue(進程序列),Pool(進程池)等內容。方便起見,這里我們直接使用進程池來完成多進程計算,其基本的計算框架可以按照以下的內容來進行填充:

import multiprocessing def CalcFunc(i, j, k): .... return .... if __name__ == '__main__': # 獲取CPU核數 n = multiprocessing.cpu_count() # 創建進程池 # 實際應用當中未必需要用滿CPU所有核,建議大家自己選擇,建設Pool時如果不加processes參數則默認為使用所有核 MyPool = multiprocessing.Pool(processes = n) # 設置固定的函數輸入變量 j = .... k = .... # 設置需要變化的函數輸入變量,將其封裝為List List = [....] # 設置List用來存儲每個函數運行所得到的結果 Result = [] # 向進程池內裝入所有需要並行執行的函數,並將結果添加到Result當中 for item in List: u = MyPool.apply_async(CalcFunc, (item, j, k, )) Result.append(u) # 關閉進程池,使得進程池無法再添加新的任務 MyPool.close() # 阻塞當前進程,並執行MyPool所定義的進程,待其執行完之后再回到當前進程 MyPool.join() # 獲取結果,此處僅打印結果 for i in Result: print i.get() 

(提及一個坑了自己的地方,有次寫測試代碼的時候圖方便便將某個函數的定義寫在了main函數當中,結果多進程運行的時候直接導致了Antimalware Service Executable進程高負荷運轉並使得代碼不再執行,關於產生這個錯誤的原因有待進一步探究,應該是和Windows Defender的保護機制有關系)

可以看到,這里使用了List來封裝每個單獨運行的函數所對應的參數,使用for循環分配參數並將任務一個個地添加到進程池當中。通過所創建的Pool對象當中的apply_async將所需要運行的內容非阻塞地添加到進程池當中,而后關閉進程池,並阻塞當前進程運行等待進程池當中的進程運行完畢。要注意的是通過apply_async所返回的是進程執行完畢所對應的ApplyResult對象,即使進程未能夠成功執行,該對象一樣能夠成功返回。對於ApplyResult對象,需要使用get()函數來獲取該進程當中函數運行所返回的結果。

在apply_async之外有多種辦法可以實現使用進程池進行多進程計算,包括apply、map和map_async。其中apply的用法與apply_async類似,其區別僅在於apply函數的執行為非阻塞式(對於進程池當中的所有子進程按照序列執行,這樣的話和單進程串行執行的區別不大,具體的區別可以參見 [1])而關於map和map_async的用法則參見以下的模板,僅需要改變向進程池添加任務的方式以及獲取結果的方式。

import multiprocessing .... if __name__ == '__main__': .... # 將每次所要輸入的參數封裝在一個List當中,再將這個List添加到ParameterList內 ParameterList = [] for i in List: ParameterList.append([i, j, k]) .... # 直接調用map或者map_async函數將所有的任務添加到進程池 Result = MyPool.map(CalcFunc, ParameterList) .... # 獲取結果不再需要調用get()函數 for i in Result: print i 

采用map或者map_async可以通過包含參數的List直接一次將所有需要執行的任務添加到進程池當中,而且map和map_async均和apply_async一樣允許並發,其區別僅在於Blocking,詳細的介紹可以參看[2]。

需要額外提及的兩點是對於Lock和callback的使用。Lock的使用是為了保證某個被進程所占據的資源/文件在該進程運行期間不被其他的進程所改寫。對於ArcGIS相關的文件,arcpy當中的函數在運行時會對於不支持共享方式訪問的數據資源設置文件獨占鎖(有關文件獨占鎖的詳細信息參見 [3]),但考慮到很多地理信息處理過程當中也會涉及到大量的其他文件類型的操作(比如csv或者kml文件),故而設置lock也是很有必要的。

添加lock的函數改寫方式可以參看以下模板代碼:

import multiprocessing # 改寫CalcFunc,使其在lock的狀態下運行 def CalcFunc(i, j, k, lock): with lock: .... return .... if __name__ == '__main__': .... # 在main函數當中定義lock對象 lock = multiprocessing.Manager().Lock() .... # 向進程池添加任務時時將lock也傳入其中 for item in List: u = MyPool.apply_async(CalcFunc, (item, j, k, lock, )) Result.append(u) .... 

而callback則是對於多進程運行的各個任務的結果的一個簡單的處理方式,比如你要將所有的運行結果添加到Result這個List當中,則可以將callback設置為Result.append。在這里map和apply的區別將直接體現:map所返回的結果序列即為添加所需要執行任務的序列,而apply則是每當一個進程執行完成的時候便返回結果,其所獲得的結果序列並不能夠對應添加執行任務的序列。設置callback的示例如下:

MyPool.apply_async(CalcFunc, (item, j, k, lock, ), callback = Result.append) MyPool.Map(CalcFunc, ParameterList, callback = Result.append) 

使用以上的進程池方法設置可以有效地縮短計算時間。對於項目當中的某個模塊,在工作站的開發環境當中進行了計算測試。在采用多進程后,總的計算時間由50分鍾縮短到了4分鍾,算是值得慶幸的一件事吧。(雖然這樣也掩蓋不了最終開發失敗的事實)

arcpy大規模數據處理

以上所述的多進程計算方法能夠在一定程度上提升了程序運行效率,但是對於大規模數據的讀寫與計算,其提升主要是在“計算”這一塊,而對於“讀寫”則提升效果有限,因為讀寫(I/O)的瓶頸主要不在CPU,而在於硬盤。為了能夠進一步地提升數據處理的效率,在使用多進程的同時還需要考慮一些其他的方法。

假設對於需要執行的任務,需要對於某一部分數據進行反復的讀取操作,那么可以考慮使用使用ArcGIS當中的“內存工作空間”來進行優化。“內存工作空間”是一個可用於寫入輸出要素類和表的基於操作系統內存的工作空間 [4]。由於該部分存儲於內存當中,故而對其進行讀寫操作的速度要顯著地高於對存儲於硬盤當中的文件進行讀寫操作的速度。當然要注意的一點是該部分的數據一定要在使用完畢后進行刪除,以免在程序的運行當中占據額外的內存。

以下是對於某個batch processing使用內存工作空間進行優化的示例。

import arcpy # 未使用內存工作空間的代碼 arcpy.Buffer_analysis('Raw Point.shp', 'Buffered Point.shp', '10 Miles') arcpy.Clip_analysis('Polyline1.shp', 'Buffered Point.shp', 'Cliped Polyline1.shp') arcpy.Clip_analysis('Polyline2.shp', 'Buffered Point.shp', 'Cliped Polyline2.shp') # 使用內存工作空間優化后的代碼,最后一步用於刪除內存工作空間當中的臨時文件 arcpy.Buffer_analysis('Raw Point.shp','in_memory/BufferedPoint', '10 Miles') arcpy.Clip_analysis('Polyline1.shp', 'in_memory/BufferedPoint', 'Cliped Polyline1.shp') arcpy.Clip_analysis('Polyline2.shp', 'in_memory/BufferedPoint', 'Cliped Polyline2.shp') arcpy.Delete_management('in_memory/BufferedPoint') 

在函數當中遇到這樣類似的過程,均可以嘗試對於需要反復讀取的文件進行上述的處理,將其轉移到內存工作空間。其他的一些方法也具有類似的思想,比如對於需要選擇一個文件當中特定的幾個feature的時候,相比於使用arcpy.Select_analysis生成一個新的文件,一般會更加傾向於使用arcpy.MakeFeatureLayer_management來生成一個圖層,在其中的where_clause寫入SQL語句以實現相同的選擇效果。后者基於原始文件生成一個臨時的子圖層,該圖層若不經保存,則在應用程序運行結束之后被清理掉。而事實上,使用圖層一般還有另外一層考慮:arcpy當中用於基於位置進行選擇的函數SelectLayerByLocation_management所針對的對象只能是Feature Layer。

然而ArcGIS當中一個很坑爹的問題在於其對於personal geodatabase的大小限制為2GB。這在一般情況下或許夠用,但是在遇上需要儲存龐大的臨時文件的時候,則顯得有些捉襟見肘。(在這里撞過一次坑,結果是導致了ERROR 000426: Out of memory)一個處理辦法是使用ArcGIS所提供的切分工具將所需要處理的要素細分為較小的要素 [5],然后對這些細分的要素分開進行處理。在此之外,其他的處理方法不外乎將部分要素使用內存空間進行儲存和及時刪除不必要的臨時文件。

或者再換一種思路,我們可以嘗試更換存儲數據的方式。以最初提到的任務場景為例,我們並不一定非要在任務序列當中將值寫入到最終的文件當中,而是可以在循環之外創建一個數組,數組的index對應最終文件的FID,數組當中的值對應所要寫入的值。(或者使用dictionary,可以做到更高的識別性)這樣,將寫入的步驟放在最后一起處理,不僅有利於多進程的執行,而且也能避免生成不必要的臨時圖層。

以下貼出一部分在項目當中所用的代碼和改進后的代碼,可以作為今后用於修改代碼的參考。
改進前:

import arcpy def CalcFunc(Route, Lines): CountLine = int(arcpy.GetCount_management(Lines).getOutput(0)) arcpy.MakeFeatureLayer_management(Routes, 'TempRouteLyr') for i in CountLine: arcpy.MakeFeatureLayer_management(Lines, 'TempLineLyr', '"FID" = {0}'.format(str(i))) sc = arcpy.da.SearchCursor('TempLineLyr', [ 'IMPACT' ] TempImpact = sc.next()[0] del sc Selection = arcpy.SelectLayerByLocation_management('TempRouteLyr', 'WITHIN_A_DISTANCE', 'TempLineLyr', '10 Miles', 'NEW_SELECTION') uc = arcpy.da.UpdateCursor(Selection, [ 'IMPACT_SUM' ]) for row in uc: row[0] += TempImpact uc.updateRow(row) del uc if __name__ == '__main__': arcpy.env.workspace = .... arcpy.Buffer_analysis('Plant.shp', 'PlantBuffer.shp', '78 Miles') arcpy.Clip_analysis('Routes.shp', 'PlantBuffer.shp', 'RoutesClip.shp') arcpy.Select_analysis('RoutesClip.shp', 'RoutesSelect.shp', '"CLASS" >= \'3\'') CalcFunc('RoutesSelect.shp', 'Lines.shp') 

改進后:

import arcpy, multiprocessing def CalcFunc(Routes, Lines, i): arcpy.MakeFeatureLayer_management(Routes, 'TempRouteLyr') arcpy.MakeFeatureLayer_management(Lines, 'TempLineLyr', '"FID" = {0}'.format(str(i))) sc = arcpy.da.SearchCursor('TempLineLyr', [ 'IMPACT' ] TempImpact = sc.next()[0] del sc Selection = arcpy.SelectLayerByLocation_management('TempRouteLyr', 'WITHIN_A_DISTANCE', 'TempLineLyr', '10 Miles', 'NEW_SELECTION') ImpactList = [ 0.0 for item in range(int(arcpy.GetCount_management(Routes).getOutput(0))) ] sc = arcpy.da.UpdateCursor(Selection, [ 'FID' ]) for row in sc: ImpactList[row[0]] = TempImpact del sc return ImpactList if __name__ == '__main__': arcpy.env.workspace = .... arcpy.Buffer_analysis('Plant.shp', 'in_memory/PlantBuffer', '78 Miles') arcpy.Clip_analysis('Routes.shp', 'in_memory/PlantBuffer', 'in_memory/RouteClip') arcpy.Select_analysis('in_memory/RouteClip', 'RoutesSelect.shp', '"CLASS" >= \'3\'') arcpy.Delete_management([ 'in_memory/PlantBuffer', 'in_memory/RouteClip' ]) MyPool = multiprocessing.Pool(processes = 4) CountLine = int(arcpy.GetCount_management('Lines.shp').getOutput(0)) TotalImpact = [ 0.0 for item in range(int(arcpy.GetCount_management(Routes).getOutput(0))) ] ResultsList = [] for i in range(CountLine): MyPool.apply_async(CalcFunc, ('RouteSelect.shp', 'Lines.shp', i) ResultsList.append(u) MyPool.close() MyPool.join() for resultitem in ResultsList: for idx, i in enumerate(resultitem.get()): TotalImpact[idx] += i uc = arcpy.da.UpdateCursor('RouteSelect.shp', [ 'FID', 'IMPACT_SUM' ]) for row in uc: row[1] = TotalImpact[row[0]] uc.updateRow(row) del uc 

借助多進程和內存工作空間,最耗費時間的SelectLayerByLocation_management得以並行執行,從而使得計算耗費的時間大大下降,當然費神重構代碼確實也是件煩心事...

一些其他的問題

雖然對於這次的計算使用了多進程等辦法來進行改進,但由於多進程當中依舊有一些問題存在,project最終提交的代碼依舊是單進程的,希望老師不要嫌棄代碼運行時間太長...

不知道有沒有有心人看到這些問題能夠一起討論一下

  • 如果使用PythonWin編輯器運行代碼,運行到多進程的這一部分就沒法繼續了

  • 若是在Spyder當中,多進程的代碼部分運行時間到大概六分鍾時,就會報錯,如下所示


     
     
  • 有時候在Spyder當中運行還會產生以下的錯誤


     
     

哎,撲街就撲街吧,也是沒辦法的事,對得起這幾天的努力就行。

第一次寫博客,不當之處,還望多多海涵。

本文為作者原創,若需轉載,請先聯系作者。
作者:廖漠琛
聯系方式:shuangqiyue@163.com



作者:Natsukage
鏈接:https://www.jianshu.com/p/983ebc387716
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM