1、基本概念
多個進程可以協同工作來完成一項任務,通常需要共享數據。所以在多進程之間保持數據的一致性就很重要,需要共享數據協同的進程必須以適當的策略來讀寫數據。同步原語和線程的庫類似。
- Lock:一個Lock對象有兩個方法acquire和release來控制共享數據的讀寫權限。
- Event:一個進程發事件的信號,另一個進程等待事件的信號。Event對象有兩個方法set和clear來管理自己內部的變量。
- Condition:此對象用來同步部分工作流程,在並行的進程中,有兩個基本的方法,wait()用來等待進程,notify_all用來通知所有等待此條件的進程。
- Semaphore:用來共享資源,比如:支持固定數據的共享連接。
- RLock:遞歸鎖對象,其用途和方法同Threading模塊一樣。
- Barrier:將程序分成幾個階段,適用於有些進程必須在某些特性進程之后執行,處於Barrier之后的代碼不能同處於Barrier之前的代碼並行。
2、測試用例
使用barrier函數來同步兩個進程
import multiprocessing from multiprocessing import Barrier, Lock, Process from time import time from datetime import datetime def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s ----> %s" %(name, datetime.fromtimestamp(now))) def test_without_barrier(): name = multiprocessing.current_process().name now = time() print("process %s ----> %s" %(name, datetime.fromtimestamp(now))) if __name__ == "__main__": # create a barrier and lock. synchronizer = Barrier(2) serializer = Lock() # create four processes Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
運行結果如下:
test_with_barrier函數調用了barrier的wait()方法,當兩個進程都調用wait()方法時,他們會一起繼續執行。
3、進程之間管理狀態
Python的多進程模塊提供了在所有的用戶間管理共享信息的管理者(Manager),一個管理者對象控制着持有Python對象的服務進程,並允許其他進程操作共享對象。
管理者特性:
- 它控制着管理共享對象的服務進程
- 它確保當某一進程修改了共享對象之后,所有的進程拿到的共享對象都得到了更新。
代碼示例:
import multiprocessing def worker(dictionary, key, item): dictionary[key] = item print("key = %d value = %d" %(key, item)) if __name__ == "__main__": mgr = multiprocessing.Manager() dictionary = mgr.dict() jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)] for j in jobs: j.start() for j in jobs: j.join() print("Results:",dictionary)
運行結果:
上述代碼創建了一個管理者字典dictionary,在n個job之間共享,每個job都會更新字典的某一個index,所有的job完成之后,最后打印該字典,所有數據均存在。