最近遇到多進程共享數據的問題,到網上查了有幾篇博客寫的蠻好的,記錄下來方便以后查看。
一、Python multiprocessing 跨進程對象共享
在mp庫當中,跨進程對象共享有三種方式,第一種僅適用於原生機器類型,即python.ctypes當中的類型,這種在mp庫的文檔當中稱為shared memory 方式,即通過共享內存共享對象;另外一種稱之為server process , 即有一個服務器進程負責維護所有的對象,而其他進程連接到該進程,通過代理對象操作服務器進程當中的對象;最后一種在mp文檔當中沒有單獨提出,但是在其 中多次提到,而且是mp庫當中最重要的一種共享方式,稱為inheritance ,即繼承,對象在 父進程當中創建,然后在父進程是通過multiprocessing.Process創建子進程之后,子進程自動繼承了父進程當中的對象,並且子進程對這 些對象的操作都是反映到了同一個對象。
這三者共享方式各有特色,在這里進行一些簡單的比較。
首先是共享方式所應對的對象類型,看這個表:
共享方式 | 支持的類型 |
Shared memory | ctypes當中的類型,通過RawValue,RawArray等包裝類提供 |
Inheritance | 系統內核對象,以及基於這些對象實現的對象。包括Pipe, Queue, JoinableQueue, 同步對象(Semaphore, Lock, RLock, Condition, Event等等) |
Server process | 所有對象,可能需要自己手工提供代理對象(Proxy) |
這個表總結了三種不同的共享方式所支持的類型,下面一個個展開討論。
其中最單純簡單的就是shared memory這種方式,只有ctypes當中的數據類型可以通過這種方式共享。由於mp庫本身缺少命名的機制,即在一個進程當中創建的對象,無法在另外一 個進程當中通過名字來引用,因此,這種共享方式依賴於繼承,對象應該由父進程創建,然后由子進程引用。關於這種機制的例子,可以參見Python文檔 當中的例子 Synchronization types like locks, conditions and queues,參考其中的test_sharedvalues函數。
然后是繼承方式。首先關於繼承方式需要有說明,繼承本質上並不是一種對象共享的機制,對象共享只是其副作用。子進程從父進程繼承來的對象並不一定是 共享的。繼承本質上是父進程fork出的子進程自動繼承父進程的內存狀態和對象描述符。因此,實際上子進程復制 了一份 父進程的對象,只不過,當這個對象包裝了一些系統內核對象的描述符的時候,拷貝這個對象(及其包裝的描述符)實現了對象的共享。因此,在上面的表當中,只 有系統內核對象,和基於這些對象實現的對象,才能夠通過繼承來共享。通過繼承共享的對象在linux平台上沒有任何限制,但是在Windows上面由於沒 有fork的實現,因此有一些額外的限制條件 ,因此,在Windows上面,繼承方式是幾乎無法用的。
最后就是Server Process這種方式。這種方式可以支持的類型比另外兩種都多,因為其模型是這樣的:
在這個模型當中,有一個manager進程,負責管理實際的對象。真正的對象也是在manager進程的內存空間當中。所有需要訪問該對象的進程都 需要先連接到該管理進程,然后獲取到對象的一個代理對象(Proxy object),通常情況下,這個代理對象提供了實際對象的公共函 數 的代理,將函數參數進行pickle,然后通過連接傳送到管理進程當中,管理進程將參數unpickle之后,轉發給相應的實際對象 的函數,返回值(或者異常)同樣經過管理進程pickle之后,通過連接傳回到客戶進程,再由proxy對象進行unpickle,返回給調用者或者拋出 異常。
很明顯,這個模型是一個典型的RPC(遠程過程調用)的模型。因為每個客戶進程實際上都是在訪問manager進程當中的對象,因此完全可以通過這 個實現對象共享。
manager和proxy之間的連接可以是基於socket的網絡連接,也可以是unix pipe。如果是使用基於socket的連接方式,在使用proxy之前,需要調用manager對象的connect函數與遠程的manager進程建 立連接。由於manager進程會打開端口接收該連接,因此必要的身份驗證是需要的,否則任何人都可以連上manager弄亂你的共享對象。mp庫通過 authkey的方式來進行身份驗證。
在實現當中,manager進程通過multiprocessing.Manager類或者BaseManager的子類實現。 BaseManager提供了函數register注冊一個函數來獲取共享對象的proxy。這個函數會被客戶進程調用,然后在manager進程當中執 行。這個函數可以返回一個共享的對象(對所有的調用返回同一個對象),或者可以為每一個調用創建一個新的對象,通過前者就可以實現多個進程共享一個對象。 關於這個的用法可以參考Python文檔 當中的例子“Demonstration of how to create and use customized managers and proxies”。
典型的導出一個共享對象的代碼是:
- ObjectType object_
- class ObjectManager(multiprocessing.managers.BaseManager): pass
- ObjectManager.register("object", lambda: object_)
注意上面介紹proxy對象的時候,我提到的“公共函數”四個字。每個proxy對象只會導出實際對象的公共函數。這里面有兩個含義,一個是“公 共”,即所有非下划線開頭的成員,另一個是“函數”,即所有callable的成員。這就帶來一些限制,一是無法導出屬性,二是無法導出一些公共的特殊函 數,例如__get__, __next__等等。對於這個mp庫有一套處理,即自定義proxy對象。首先是BaseManager的register可以提供一個 proxy_type作為第三個參數,這個參數指定了哪些成員需要被導出。詳細的使用方法可以參見文檔當中的第一個例子。
另外manager還有一些細節的問題需要注意。由於Proxy對象不是線程安全的,因此如果需要在一個多線程程序當中使用proxy,mp庫會為 每個線程創建一個proxy對象,而每個proxy對象都會對server process創建一個連接,而manager那邊對於每個連接都創建一個單獨的線程來為其服務。這樣帶來的問題就是,如果客戶進程有很多線程,很容易會 導致manager進程的fd數目達到ulimit的限制,即使沒有達到限制,也會因為manager進程當中有太多線程而嚴重影響manager的性 能。解決方案可以是一個進程內cache,只有一個單獨的線程可以創建proxy對象訪問共享對象,其余線程只能訪問該進程當中的cache。
一旦manager因為達到ulimit限制或者其他異常,manager會直接退出,遺憾的是,這時候已經建立的proxy會試圖重新連接 manager – 但是它已經不存在了。這個會導致客戶進程hang在對proxy的函數調用上,這個時候,目前除了殺掉進程沒有找到別的辦法。
另外proxy使用socket的方式比較tricky,因此和內置的socket庫有很多沖突,比如 socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout調用了之后,進程當中所有通過socket模塊建立的socket都是被設置為unblock模式的,但是mp 庫並不知道這一點,而且它總是假設socket都是block模式的,於是,一旦調用了setdefaulttimeout,所有對於proxy的函數調 用都會拋出OSError,錯誤代碼為11,錯誤原因是非常有誤導性的“Resource temporarily unavailable”,實際上就是EAGAIN。這個錯誤可以通過我提供的一個patch 來補救(這個patch當中還包含其他的一些修復,所以請自行查看並修改該patch)。
由於以上的一些原因,server process模式作為一個對象的共享模式,能夠提供最為靈活的共享方式,但是也有最多的問題。這個在使用過程當中就靠自己去衡量了。目前我們的系統對於 數據可靠性方面要求不高,丟失數據是可以接受的,但是也只用這種模式來維護統計值,不敢用來維護更多的東西。
二、Python多進程寫入同一文件
最近用python的正則表達式處理了一些文本數據,需要把結果寫到文件里面,但是由於文件比較大,所以運行起來花費的時間很長。但是打開任務管理器發現CPU只占用了25%,上網找了一下原因發現是由於一個叫GIL的存在,使得Python在同一時間只能運行一個線程,所以只占用了一個CPU,由於我的電腦是4核的,所以CPU利用率就是25%了。
既然多線程沒有什么用處,那就可以使用多進程來處理,畢竟多進程是可以不受GIL影響的。Python提供了一個multiprocessing的多進程庫,但是多進程也有一些問題,比如,如果進程都需要寫入同一個文件,那么就會出現多個進程爭用資源的問題,如果不解決,那就會使文件的內容順序雜亂。這就需要涉及到鎖了,但是加鎖一般會造成程序的執行速度下降,而且如果進程在多處需要向文件輸出,也不好把這些代碼整個都鎖起來,如果都鎖起來,那跟單進程還有什么區別。有一個解決辦法就是把向文件的輸出都整合到一塊去,在這一塊集中加個鎖,這樣問題就不大了。不過還有一種更加優雅的解決方式:使用multiprocessing庫的回調函數功能。
具體思路跟把文件輸出集中在一起也差不多,就是把進程需要寫入文件的內容作為返回值返回給惠和的回調函數,使用回調函數向文件中寫入內容。這樣做在windows下面還有一個好處,在windows環境下,python的多進程沒有像linux環境下的多進程一樣,linux環境下的multiprocessing庫是基於fork函數,父進程fork了一個子進程之后會把自己的資源,比如文件句柄都傳遞給子進程。但是在windows環境下沒有fork函數,所以如果你在父進程里打開了一個文件,在子進程中寫入,會出現ValueError: I/O operation on closed file
這樣的錯誤,而且在windows環境下最好加入if __name__ == '__main__'
這樣的判斷,以避免一些可能出現的RuntimeError或者死鎖。
下面是代碼:
from multiprocessing import Pool import time def mycallback(x): list1.append(x) def sayHi(num): return num if __name__ == '__main__': pool = Pool(4) list1 = [] for i in range(4): pool.apply_async(sayHi, (i,), callback=mycallback) # print(x) pool.close() pool.join() print(list1)
三、Python 進程之間共享數據(全局變量)
進程之間共享數據(數值型):
import multiprocessing def func(num): num.value=10.78 #子進程改變數值的值,主進程跟着改變 if __name__=="__main__": num=multiprocessing.Value("d",10.0) # d表示數值,主進程與子進程共享這個value。(主進程與子進程都是用的同一個value) print(num.value) p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num.value)
進程之間共享數據(數組型):
進程之間共享數據(dict,list):
import multiprocessing def func(mydict,mylist): mydict["index1"]="aaaaaa" #子進程改變dict,主進程跟着改變 mydict["index2"]="bbbbbb" mylist.append(11) #子進程改變List,主進程跟着改變 mylist.append(22) mylist.append(33) if __name__=="__main__": with multiprocessing.Manager() as MG: #重命名 mydict=multiprocessing.Manager().dict() #主進程與子進程共享這個字典 mylist=multiprocessing.Manager().list(range(5)) #主進程與子進程共享這個List p=multiprocessing.Process(target=func,args=(mydict,mylist)) p.start() p.join() print(mylist) print(mydict)
四、參考鏈接:
1、http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process/
2、http://blog.csdn.net/Q_AN1314/article/details/51923022
3、http://blog.csdn.net/houyanhua1/article/details/78244288