Python並行編程(十二):進程同步


1、基本概念

      多個進程可以協同工作來完成一項任務,通常需要共享數據。所以在多進程之間保持數據的一致性就很重要,需要共享數據協同的進程必須以適當的策略來讀寫數據。同步原語和線程的庫類似。

      - Lock:一個Lock對象有兩個方法acquire和release來控制共享數據的讀寫權限。

      - Event:一個進程發事件的信號,另一個進程等待事件的信號。Event對象有兩個方法set和clear來管理自己內部的變量。

      - Condition:此對象用來同步部分工作流程,在並行的進程中,有兩個基本的方法,wait()用來等待進程,notify_all用來通知所有等待此條件的進程。

      - Semaphore:用來共享資源,比如:支持固定數據的共享連接。

      - RLock:遞歸鎖對象,其用途和方法同Threading模塊一樣。

      - Barrier:將程序分成幾個階段,適用於有些進程必須在某些特性進程之后執行,處於Barrier之后的代碼不能同處於Barrier之前的代碼並行。

2、測試用例

      使用barrier函數來同步兩個進程

import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = multiprocessing.current_process().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("process %s ----> %s" %(name, datetime.fromtimestamp(now)))

def test_without_barrier():
    name = multiprocessing.current_process().name
    now = time()
    print("process %s ----> %s" %(name, datetime.fromtimestamp(now)))

if __name__ == "__main__":
    # create a barrier and lock. 
    synchronizer = Barrier(2)
    serializer = Lock()
    # create four processes
    Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
    Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
    Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
    Process(name='p4 - test_without_barrier', target=test_without_barrier).start()

      運行結果如下:

      

      test_with_barrier函數調用了barrier的wait()方法,當兩個進程都調用wait()方法時,他們會一起繼續執行。

3、進程之間管理狀態

      Python的多進程模塊提供了在所有的用戶間管理共享信息的管理者(Manager),一個管理者對象控制着持有Python對象的服務進程,並允許其他進程操作共享對象。

      管理者特性:

      - 它控制着管理共享對象的服務進程

      - 它確保當某一進程修改了共享對象之后,所有的進程拿到的共享對象都得到了更新。

      代碼示例:

import multiprocessing

def worker(dictionary, key, item):
    dictionary[key] = item
    print("key = %d value = %d" %(key, item))

if __name__ == "__main__":
    mgr = multiprocessing.Manager()
    dictionary = mgr.dict()
    jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print("Results:",dictionary)

      運行結果:

      

      上述代碼創建了一個管理者字典dictionary,在n個job之間共享,每個job都會更新字典的某一個index,所有的job完成之后,最后打印該字典,所有數據均存在。

      

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM