python3之multiprocess模塊(上)


基礎方法

簡單的使用方法:

  1: import multiprocessing
  2: def worker():
  3:     print("working!")
  4: if __name__ == '__main__':
  5:     jobs = []
  6:     for i in range(5):
  7:         p = multiprocessing.Process(target=worker)
  8:         jobs.append(p)
  9:         p.start()

另外一種使用方法是導入第三方文件的目標函數:

work.py

  1: def worker():
  2:     print("worker")
  3:     return

multiprocess_main.py

  1: import multiprocessing
  2: import worker
  3: if __name__ == '__main__':
  4:     jobs = []
  5:     for i in range(5):
  6:         p = multiprocessing.Process(
  7:             target=worker.worker,
  8:         )
  9:         jobs.append(p)
 10:         p.start()

傳遞參數的話,這個參數必須能pickle序列化。

  1: import multiprocessing
  2: def worker(i):
  3:     print("worker%s" % i)
  4: if __name__ == '__main__':
  5:     jobs = []
  6:     for i in range(5):
  7:         p = multiprocessing.Process(target=worker, args=(i,))
  8:         jobs.append(p)
  9:         p.start()

打印結果:

  1: worker0
  2: worker1
  3: worker2
  4: worker3
  5: worker4

確定當前的進程

默認的name

  1: name = multiprocessing.current_process().name

或者實例化時傳入name參數

  1: t = multiprocessing.Process(
  2:     name="t1",
  3:     target=work,
  4: )

守護進程

類似於守護線程,守護進程會在主程序退出之前自動終止,以免留下孤兒進程。

t在這里是某個進程對象。

  1: t.daemon = True

同樣的要等待一個進程完成,可以使用join()方法

  1: t.join()

默認地join()會無限阻塞。可以傳入一個超時參數(浮點數,秒數)。在這個時限內即使沒有完成,join()也會停止阻塞,返回

  1: t.join(sec)

終止進程

對一個進程對象調用terminate()會結束子進程。

  1: t.terminate()

進程退出時生成的狀態碼可以通過exitcode屬性訪問。下表是退出碼:

  1: ==0 未生成任何錯誤
  2: > 0 進程有一個錯誤,並以該錯誤碼退出
  3: < 0 進程以一個-1*exitcode信號結束

exitcode是屬性不是方法

  1: t.exitcode

日志

使用multiprocess.log_to_stderr()屬性基於logging模塊來建一個日志記錄的對象,里面的參數傳入日志的級別。要注意設置的位置。

  1: if __name=='__main__':
  2:     multiprocess.log_to_stderr(logging.DEBUG)
  3:     p = multiprocess.Process(target=worker)
  4:     p.start()
  5:     p.join()

也可以直接拿到這個日志對象來處理。

  1: logger = multiprocess.get_logger()
  2: logger.setLevel(logging.INFO)

派生進程

最簡單的方法是使用Process並傳入一個目標函數,但也可以使用一個定制子類。但子類應該覆蓋run()方法以完成工作

  1: import multiprocessing
  2: class Worker(multiprocessing.Process):
  3:     def run(self):
  4:         print("In{}".format(self.name))
  5:         return
  6: if __name__ == '__main__':
  7:     jobs = []
  8:     for i in range(5):
  9:         p = Worker()
 10:         jobs.append(p)
 11:         p.start()
 12:     for j in jobs:
 13:         j.join()

進程間的通信

用一個multiprocessing.Queue([maxsize])來回傳遞消息,能夠pickle串行化的任何對象都可以通過Queue傳遞。

q.put方法用以插入數據到隊列中。

q.get方法可以從隊列讀取並且刪除一個元素。

簡單的使用方法:

  1: import multiprocessing
  2: q = multiprocessing.Queue(3)
  3: 
  4: q.put(1)
  5: q.put(2)
  6: q.put(3)
  7: # 滿了,返回True
  8: print(q.full())
  9: 
 10: print(q.get())
 11: print(q.get())
 12: print(q.get())
 13: # 空了,返回True
 14: print(q.empty())

生產者-消費者模型

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

  1: import multiprocessing
  2: import time,random,os
  3: def consumer(q, name):
  4:     while True:
  5:         res = q.get()
  6:         time.sleep(random.randint(1, 3))
  7:         print("%s 吃了 %s" % (name, res))
  8: def producer(q,name,food):
  9:     for i in range(3):
 10:         time.sleep(random.randint(1, 3))
 11:         res = "%s%s" % (food, i)
 12:         q.put(res)
 13:         print("%s 生產了 %s" % (name, res))
 14: if __name__ == '__main__':
 15:     q = multiprocessing.Queue()
 16:     # 生產者
 17:     p1 = multiprocessing.Process(target=producer, args=(q, "egon", "包子"))
 18:     # 消費者
 19:     c1 = multiprocessing.Process(target=consumer, args=(q, "alex"))
 20:     p1.start()
 21:     c1.start()
 22:     print("")

但是上述代碼在三個包子被消費完后會卡死,所以需要發送結束信號,而JoinableQueue這種隊列提供了這種機制。

JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:

q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

  1: from multiprocessing import Process,JoinableQueue
  2: import time,random
  3: def consumer(q,name):
  4:     while True:
  5:         res=q.get()
  6:         time.sleep(random.randint(1,3))
  7:         print('%s 吃 %s' %(name,res))
  8:         q.task_done() #發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢了
  9: def producer(q,name,food):
 10:     for i in range(3):
 11:         time.sleep(random.randint(1,3))
 12:         res='%s%s' %(food,i)
 13:         q.put(res)
 14:         print('%s 生產了 %s' %(name,res))
 15:     q.join() #等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束
 16: if __name__ == '__main__':
 17:     q=JoinableQueue() #使用JoinableQueue()
 18: 
 19:     #生產者們
 20:     p1=Process(target=producer,args=(q,'egon1','包子'))
 21:     p2=Process(target=producer,args=(q,'egon2','骨頭'))
 22:     p3=Process(target=producer,args=(q,'egon3','泔水'))
 23: 
 24:     #消費者們
 25:     c1=Process(target=consumer,args=(q,'alex1'))
 26:     c2=Process(target=consumer,args=(q,'alex2'))
 27:     c1.daemon=True
 28:     c2.daemon=True
 29: 
 30:     #開始
 31:     p1.start()
 32:     p2.start()
 33:     p3.start()
 34:     c1.start()
 35:     c2.start()
 36: 
 37:     p1.join()
 38:     p2.join()
 39:     p3.join()
 40:     #1、主進程等生產者p1、p2、p3結束
 41:     #2、而p1、p2、p3是在消費者把所有數據都取干凈之后才會結束
 42:     #3、所以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該隨着主進程一塊死掉,因而需要將消費者們設置成守護進程
 43:     print('主process')

結果:

  1: egon2 生產了 骨頭0
  2: egon1 生產了 包子0
  3: egon3 生產了 泔水0
  4: alex1 吃 骨頭0
  5: egon3 生產了 泔水1
  6: egon2 生產了 骨頭1
  7: alex2 吃 包子0
  8: egon1 生產了 包子1
  9: alex1 吃 泔水0
 10: egon2 生產了 骨頭2
 11: egon1 生產了 包子2
 12: egon3 生產了 泔水2
 13: alex1 吃 骨頭1
 14: alex2 吃 泔水1
 15: alex2 吃 骨頭2
 16: alex1 吃 包子1
 17: alex1 吃 泔水2
 18: alex2 吃 包子2
 19: 主process


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM