python之multiprocess模塊(下)


進程間狀態信息

同樣的,Event類可以在進程之間傳遞狀態信息。事件可以在設置狀態和未設置狀態之間切換。還可以添加一個可選的超時值,超時后狀態可以從未設置變為設置。

  1: import multiprocessing
  2: import time
  3: def wait_for_event(e):
  4:     print("wait for event:starting")
  5:     e.wait()
  6:     print("wait for event:e_is_set()->", e.is_set())
  7: def wait_for_event_timeout(e, t):
  8:     print("wait_for_event_timeout:starting")
  9:     e.wait(t)
 10:     print("wait_for_event_timeout:e.is_set()->", e.is_set())
 11: if __name__ == '__main__':
 12:     e = multiprocessing.Event()
 13:     w1 = multiprocessing.Process(
 14:         name="block",
 15:         target=wait_for_event,
 16:         args=(e, )
 17:     )
 18:     w1.start()
 19:     w2 = multiprocessing.Process(
 20:         name="nonblock",
 21:         target=wait_for_event_timeout,
 22:         args=(e, 2)
 23:     )
 24:     w2.start()
 25:     print("main:waiting before calling Event.set()")
 26:     time.sleep(3)
 27:     e.set()
 28:     print("main:event is set")

結果,wait(t)到時間就會返回。也可以e.set()直接更改設置。

  1: main:waiting before calling Event.set()
  2: wait for event:starting
  3: wait_for_event_timeout:starting
  4: wait_for_event_timeout:e.is_set()-> False
  5: main:event is set
  6: wait for event:e_is_set()-> True

控制資源的訪問

如果需要在多個進程間共享一個資源,可以使用一個Lock鎖來避免訪問沖突。

使用的api如下:

lock = multiprocessing.Lock() 實例化一個鎖對象

lock.acquire() 加鎖

lock.release() 釋放鎖

with lock: # 拿到鎖執行代碼后並釋放鎖,注意不要嵌套。

    # 業務代碼…

但是鎖的問題比較復雜並且效率低,所有我們一般避免使用共享的數據,而是使用消息傳遞和隊列(Queue)

同步操作

Condition對象可以來同步一個工作流的各個部分。使其中一部分並行運行,另外一些順序運行,即使它們在不同的進程中。

下面是一個簡單的例子:

  1: import multiprocessing
  2: import time
  3: def stage_1(cond):
  4:     name = multiprocessing.current_process().name
  5:     print("starting", name)
  6:     with cond:
  7:         print("{} done and ready for stage 2".format(name))
  8:         # 激活等待的進程
  9:         cond.notify_all()
 10: def stage_2(cond):
 11:     name = multiprocessing.current_process().name
 12:     print("starting", name)
 13:     with cond:
 14:         cond.wait()
 15:         print("{} running".format(name))
 16: if __name__ == '__main__':
 17:     condition = multiprocessing.Condition()
 18:     s1 = multiprocessing.Process(
 19:         name="s1",
 20:         target=stage_1,
 21:         args=(condition, ),
 22:     )
 23:     s2_client = [
 24:         multiprocessing.Process(
 25:         name="stage_2[{}]".format(i),
 26:         target=stage_2,
 27:         args=(condition, ),
 28:     ) for i in range(1, 3)]
 29:     for c in s2_client:
 30:         c.start()
 31:         time.sleep(1)
 32:     s1.start()
 33: 
 34:     s1.join()
 35:     for c in s2_client:
 36:         c.join()

結果:(根據機器的配置結果有輕微的不同)

  1: starting stage_2[1]
  2: starting stage_2[2]
  3: starting s1
  4: s1 done and ready for stage 2
  5: stage_2[1] running
  6: stage_2[2] running

控制資源的並發訪問

有時候允許多個進程同時訪問一個資源,但是要限制總數。比如一個網絡應用可能支持固定數目的並發下載。用Semaphore來管理這些連接.3是允許同時訪問的最大進程數。

  1: s = multiprocess.Semaphore(3)
  2: jobs = [
  3:     multiprocess.Process(
  4:         target=worker,
  5:         name=str(i),
  6:         args=(s,)
  7: )
  8: for i in range(10)]

管理共享狀態

管理器multiprocess.Manager()除了支持字典之外,還支持列表

  1: import multiprocessing
  2: def worker(d, key, value):
  3:     mgr = multiprocessing.Manager()
  4:     d[key] = value
  5: if __name__ == '__main__':
  6:     mgr = multiprocessing.Manager()
  7:     d = mgr.dict()
  8:     jobs = [
  9:         multiprocessing.Process(
 10:             target=worker,
 11:             args=(d, i, i*2),
 12:         ) for i in range(10)
 13:     ]
 14:     for i in jobs:
 15:         i.start()
 16:     for j in jobs:
 17:         j.join()
 18:     print("D->", d)

結果。由於這個列表是通過管理器創建的,所以它會由所有的進程共享。

  1: D-> {1: 2, 3: 6, 0: 0, 5: 10, 8: 16, 2: 4, 7: 14, 6: 12, 4: 8, 9: 18}

共享命名空間

namespace = mgr.Namespace()

下面是一個簡單的示例

  1: import multiprocessing
  2: def producer(ns, event):
  3:     ns.value = "this is a value"
  4:     event.set()
  5: def consumer(ns, event):
  6:     try:
  7:         print("Before event:{}".format(ns.value))
  8:     except Exception as err:
  9:         print("Before event error:", str(err))
 10:     event.wait()
 11:     print("After event:", ns.value)
 12: if __name__ == '__main__':
 13:     mgr = multiprocessing.Manager()
 14:     namespace = mgr.Namespace()
 15:     event = multiprocessing.Event()
 16:     p = multiprocessing.Process(
 17:         target=producer,
 18:         args=(namespace, event)
 19:     )
 20:     c = multiprocessing.Process(
 21:         target=consumer,
 22:         args=(namespace, event),
 23:     )
 24:     c.start()
 25:     p.start()
 26:     c.join()
 27:     p.join()

結果:

  1: Before event error: 'Namespace' object has no attribute 'value'
  2: After event: this is a value

對於命名空間中可變值內容的更新不會自動傳播。如果需要更新要將它再次關聯到命名空間對象。

進程池

Pool類可以管理固定數目的工作進程。

  1: import multiprocessing
  2: def do_calculation(data):
  3:     return data * 2
  4: def start_process():
  5:     print("starting", multiprocessing.current_process().name)
  6: if __name__ == '__main__':
  7:     inputs = list(range(10))
  8:     print("Input :", inputs)
  9:     builtin_outputs = list(map(do_calculation, inputs))
 10:     print("Built-in:", builtin_outputs)
 11:     pool_size = multiprocessing.cpu_count()*2
 12:     pool = multiprocessing.Pool(
 13:         processes=pool_size,
 14:         initializer=start_process,
 15:     )
 16:     pool_outputs = pool.map(do_calculation, inputs)
 17:     pool.close()
 18:     pool.join()
 19:     print("Pool:", pool_outputs)

close與join讓任務與主進程同步。結果:

  1: Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  2: Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  3: starting SpawnPoolWorker-1
  4: starting SpawnPoolWorker-2
  5: starting SpawnPoolWorker-4
  6: starting SpawnPoolWorker-5
  7: starting SpawnPoolWorker-6
  8: starting SpawnPoolWorker-3
  9: starting SpawnPoolWorker-8
 10: starting SpawnPoolWorker-7
 11: Pool: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

當然pool類也可由設置maxtasksperchild參數來告訴池對象,在完成一些任務之后要重新啟動一個工作進程,來避免長時間運行的工作進程消耗更多的系統資源。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM