基礎方法
簡單的使用方法:
1: import multiprocessing2: def worker():3: print("working!")
4: if __name__ == '__main__':
5: jobs = []6: for i in range(5):
7: p = multiprocessing.Process(target=worker)8: jobs.append(p)9: p.start()
另外一種使用方法是導入第三方文件的目標函數:
work.py
1: def worker():2: print("worker")
3: return
multiprocess_main.py
1: import multiprocessing2: import worker3: if __name__ == '__main__':
4: jobs = []5: for i in range(5):
6: p = multiprocessing.Process(7: target=worker.worker,8: )9: jobs.append(p)10: p.start()
要傳遞參數的話,這個參數必須能pickle序列化。
1: import multiprocessing2: def worker(i):3: print("worker%s" % i)
4: if __name__ == '__main__':
5: jobs = []6: for i in range(5):
7: p = multiprocessing.Process(target=worker, args=(i,))8: jobs.append(p)9: p.start()
打印結果:
1: worker02: worker13: worker24: worker35: worker4
確定當前的進程
默認的name
1: name = multiprocessing.current_process().name
或者實例化時傳入name參數
1: t = multiprocessing.Process(2: name="t1",
3: target=work,4: )
守護進程
類似於守護線程,守護進程會在主程序退出之前自動終止,以免留下孤兒進程。
t在這里是某個進程對象。
1: t.daemon = True
同樣的要等待一個進程完成,可以使用join()方法
1: t.join()
默認地join()會無限阻塞。可以傳入一個超時參數(浮點數,秒數)。在這個時限內即使沒有完成,join()也會停止阻塞,返回
1: t.join(sec)
終止進程
對一個進程對象調用terminate()會結束子進程。
1: t.terminate()
進程退出時生成的狀態碼可以通過exitcode屬性訪問。下表是退出碼:
1: ==0 未生成任何錯誤2: > 0 進程有一個錯誤,並以該錯誤碼退出3: < 0 進程以一個-1*exitcode信號結束
exitcode是屬性不是方法
1: t.exitcode
日志
使用multiprocess.log_to_stderr()屬性基於logging模塊來建一個日志記錄的對象,里面的參數傳入日志的級別。要注意設置的位置。
1: if __name=='__main__':2: multiprocess.log_to_stderr(logging.DEBUG)3: p = multiprocess.Process(target=worker)4: p.start()
5: p.join()
也可以直接拿到這個日志對象來處理。
1: logger = multiprocess.get_logger()2: logger.setLevel(logging.INFO)
派生進程
最簡單的方法是使用Process並傳入一個目標函數,但也可以使用一個定制子類。但子類應該覆蓋run()方法以完成工作
1: import multiprocessing
2: class Worker(multiprocessing.Process):
3: def run(self):
4: print("In{}".format(self.name))5: return
6: if __name__ == '__main__':7: jobs = []8: for i in range(5):9: p = Worker()10: jobs.append(p)11: p.start()
12: for j in jobs:13: j.join()
進程間的通信
用一個multiprocessing.Queue([maxsize])來回傳遞消息,能夠pickle串行化的任何對象都可以通過Queue傳遞。
q.put方法用以插入數據到隊列中。
q.get方法可以從隊列讀取並且刪除一個元素。
簡單的使用方法:
1: import multiprocessing
2: q = multiprocessing.Queue(3)3:4: q.put(1)5: q.put(2)6: q.put(3)7: # 滿了,返回True8: print(q.full())
9:10: print(q.get())
11: print(q.get())
12: print(q.get())
13: # 空了,返回True14: print(q.empty())
生產者-消費者模型
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
1: import multiprocessing
2: import time,random,os3: def consumer(q, name):4: while True:5: res = q.get()6: time.sleep(random.randint(1, 3))
7: print("%s 吃了 %s" % (name, res))8: def producer(q,name,food):9: for i in range(3):10: time.sleep(random.randint(1, 3))
11: res = "%s%s" % (food, i)
12: q.put(res)13: print("%s 生產了 %s" % (name, res))14: if __name__ == '__main__':15: q = multiprocessing.Queue()16: # 生產者17: p1 = multiprocessing.Process(target=producer, args=(q, "egon", "包子"))18: # 消費者19: c1 = multiprocessing.Process(target=consumer, args=(q, "alex"))
20: p1.start()
21: c1.start()
22: print("主")
但是上述代碼在三個包子被消費完后會卡死,所以需要發送結束信號,而JoinableQueue這種隊列提供了這種機制。
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
1: from multiprocessing import Process,JoinableQueue
2: import time,random3: def consumer(q,name):4: while True:5: res=q.get()6: time.sleep(random.randint(1,3))
7: print('%s 吃 %s' %(name,res))8: q.task_done() #發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢了
9: def producer(q,name,food):10: for i in range(3):11: time.sleep(random.randint(1,3))
12: res='%s%s' %(food,i)13: q.put(res)14: print('%s 生產了 %s' %(name,res))15: q.join() #等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束
16: if __name__ == '__main__':17: q=JoinableQueue() #使用JoinableQueue()18:19: #生產者們20: p1=Process(target=producer,args=(q,'egon1','包子'))21: p2=Process(target=producer,args=(q,'egon2','骨頭'))22: p3=Process(target=producer,args=(q,'egon3','泔水'))23:24: #消費者們25: c1=Process(target=consumer,args=(q,'alex1'))26: c2=Process(target=consumer,args=(q,'alex2'))27: c1.daemon=True28: c2.daemon=True29:30: #開始31: p1.start()
32: p2.start()
33: p3.start()
34: c1.start()
35: c2.start()
36:37: p1.join()
38: p2.join()
39: p3.join()
40: #1、主進程等生產者p1、p2、p3結束41: #2、而p1、p2、p3是在消費者把所有數據都取干凈之后才會結束42: #3、所以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該隨着主進程一塊死掉,因而需要將消費者們設置成守護進程43: print('主process')
結果:
1: egon2 生產了 骨頭02: egon1 生產了 包子03: egon3 生產了 泔水04: alex1 吃 骨頭05: egon3 生產了 泔水16: egon2 生產了 骨頭17: alex2 吃 包子08: egon1 生產了 包子19: alex1 吃 泔水010: egon2 生產了 骨頭211: egon1 生產了 包子212: egon3 生產了 泔水213: alex1 吃 骨頭114: alex2 吃 泔水115: alex2 吃 骨頭216: alex1 吃 包子117: alex1 吃 泔水218: alex2 吃 包子219: 主process