進程間狀態信息
同樣的,Event類可以在進程之間傳遞狀態信息。事件可以在設置狀態和未設置狀態之間切換。還可以添加一個可選的超時值,超時后狀態可以從未設置變為設置。
1: import multiprocessing
2: import time3: def wait_for_event(e):
4: print("wait for event:starting")5: e.wait()6: print("wait for event:e_is_set()->", e.is_set())7: def wait_for_event_timeout(e, t):
8: print("wait_for_event_timeout:starting")9: e.wait(t)10: print("wait_for_event_timeout:e.is_set()->", e.is_set())11: if __name__ == '__main__':12: e = multiprocessing.Event()13: w1 = multiprocessing.Process(14: name="block",15: target=wait_for_event,16: args=(e, )17: )18: w1.start()
19: w2 = multiprocessing.Process(20: name="nonblock",21: target=wait_for_event_timeout,22: args=(e, 2)23: )24: w2.start()
25: print("main:waiting before calling Event.set()")26: time.sleep(3)
27: e.set()28: print("main:event is set")
結果,wait(t)到時間就會返回。也可以e.set()直接更改設置。
1: main:waiting before calling Event.set()2: wait for event:starting3: wait_for_event_timeout:starting4: wait_for_event_timeout:e.is_set()-> False5: main:event is set6: wait for event:e_is_set()-> True
控制資源的訪問
如果需要在多個進程間共享一個資源,可以使用一個Lock鎖來避免訪問沖突。
使用的api如下:
lock = multiprocessing.Lock() 實例化一個鎖對象
lock.acquire() 加鎖
lock.release() 釋放鎖
with lock: # 拿到鎖執行代碼后並釋放鎖,注意不要嵌套。
# 業務代碼…
但是鎖的問題比較復雜並且效率低,所有我們一般避免使用共享的數據,而是使用消息傳遞和隊列(Queue)
同步操作
Condition對象可以來同步一個工作流的各個部分。使其中一部分並行運行,另外一些順序運行,即使它們在不同的進程中。
下面是一個簡單的例子:
1: import multiprocessing
2: import time3: def stage_1(cond):
4: name = multiprocessing.current_process().name5: print("starting", name)6: with cond:7: print("{} done and ready for stage 2".format(name))8: # 激活等待的進程9: cond.notify_all()10: def stage_2(cond):
11: name = multiprocessing.current_process().name12: print("starting", name)13: with cond:14: cond.wait()15: print("{} running".format(name))16: if __name__ == '__main__':17: condition = multiprocessing.Condition()18: s1 = multiprocessing.Process(19: name="s1",20: target=stage_1,21: args=(condition, ),22: )23: s2_client = [24: multiprocessing.Process(25: name="stage_2[{}]".format(i),26: target=stage_2,27: args=(condition, ),28: ) for i in range(1, 3)]29: for c in s2_client:30: c.start()
31: time.sleep(1)
32: s1.start()
33:34: s1.join()
35: for c in s2_client:36: c.join()
結果:(根據機器的配置結果有輕微的不同)
1: starting stage_2[1]2: starting stage_2[2]3: starting s14: s1 done and ready for stage 25: stage_2[1] running6: stage_2[2] running
控制資源的並發訪問
有時候允許多個進程同時訪問一個資源,但是要限制總數。比如一個網絡應用可能支持固定數目的並發下載。用Semaphore來管理這些連接.3是允許同時訪問的最大進程數。
1: s = multiprocess.Semaphore(3)2: jobs = [3: multiprocess.Process(4: target=worker,5: name=str(i),
6: args=(s,)7: )8: for i in range(10)]
管理共享狀態
管理器multiprocess.Manager()除了支持字典之外,還支持列表。
1: import multiprocessing
2: def worker(d, key, value):
3: mgr = multiprocessing.Manager()4: d[key] = value5: if __name__ == '__main__':6: mgr = multiprocessing.Manager()7: d = mgr.dict()8: jobs = [9: multiprocessing.Process(10: target=worker,11: args=(d, i, i*2),12: ) for i in range(10)13: ]14: for i in jobs:15: i.start()
16: for j in jobs:17: j.join()
18: print("D->", d)
結果。由於這個列表是通過管理器創建的,所以它會由所有的進程共享。
1: D-> {1: 2, 3: 6, 0: 0, 5: 10, 8: 16, 2: 4, 7: 14, 6: 12, 4: 8, 9: 18}
共享命名空間
namespace = mgr.Namespace()
下面是一個簡單的示例
1: import multiprocessing
2: def producer(ns, event):
3: ns.value = "this is a value"
4: event.set()5: def consumer(ns, event):
6: try:7: print("Before event:{}".format(ns.value))8: except Exception as err:9: print("Before event error:", str(err))10: event.wait()11: print("After event:", ns.value)12: if __name__ == '__main__':13: mgr = multiprocessing.Manager()14: namespace = mgr.Namespace()15: event = multiprocessing.Event()16: p = multiprocessing.Process(17: target=producer,18: args=(namespace, event)19: )20: c = multiprocessing.Process(21: target=consumer,22: args=(namespace, event),23: )24: c.start()
25: p.start()
26: c.join()
27: p.join()
結果:
1: Before event error: 'Namespace' object has no attribute 'value'2: After event: this is a value
對於命名空間中可變值內容的更新不會自動傳播。如果需要更新要將它再次關聯到命名空間對象。
進程池
Pool類可以管理固定數目的工作進程。
1: import multiprocessing
2: def do_calculation(data):3: return data * 24: def start_process():
5: print("starting", multiprocessing.current_process().name)6: if __name__ == '__main__':7: inputs = list(range(10))8: print("Input :", inputs)9: builtin_outputs = list(map(do_calculation, inputs))10: print("Built-in:", builtin_outputs)11: pool_size = multiprocessing.cpu_count()*212: pool = multiprocessing.Pool(13: processes=pool_size,14: initializer=start_process,15: )16: pool_outputs = pool.map(do_calculation, inputs)17: pool.close()
18: pool.join()
19: print("Pool:", pool_outputs)
close與join讓任務與主進程同步。結果:
1: Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]2: Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]3: starting SpawnPoolWorker-14: starting SpawnPoolWorker-25: starting SpawnPoolWorker-46: starting SpawnPoolWorker-57: starting SpawnPoolWorker-68: starting SpawnPoolWorker-39: starting SpawnPoolWorker-810: starting SpawnPoolWorker-711: Pool: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
當然pool類也可由設置maxtasksperchild參數來告訴池對象,在完成一些任務之后要重新啟動一個工作進程,來避免長時間運行的工作進程消耗更多的系統資源。