https://www.cnblogs.com/Eva-J/articles/8253549.html 參考鏈接
multiprocess模塊
進程的生命周期:
1.主進程
2.子進程
開啟子進程的主進程:
主進程自己的代碼如果長,等待自己的代碼執行結束。
子進程的執行時間長,主進程會在主進程代碼執行完畢后等待子進程執行完畢后 主進程才結束。
開啟一個進程
from multiprocessing import Process import time def func(): print('我是一個子進程') if __name__ == '__main__': # p是一個進程對象 還沒有啟動進程 p = Process(target=func) # 主進程 # 啟動一個子進程. 操作系統創建新進程執行新進程中的代碼 p.start() # 主進程 # 一般都是異步開啟子進程,主進程先執行 print('riven') print('mark') print('mimi')
傳參和查看進程號
Os.getpid :查看當前進程的進程號。
Os.getppid :查看當前進程的父進程號。
from multiprocessing import Process import time import os # 傳值給子進程 def func(args, kwargs): print(args) print(kwargs) # 查看當前進程進程號. print(os.getpid()) # 查看當前進程父進程號. print(os.getppid()) if __name__ == '__main__': # args = 傳入的參數 p = Process(target=func, args=("我的乖乖", "我太難了")) # 主進程 # 啟動一個子進程. 操作系統創建新進程執行新進程中的代碼 p.start() # 主進程 # 一般都是異步開啟子進程,主進程先執行 print('riven') print('mark') print('mimi') # 查看當前進程父進程號. print(os.getppid()) # 查看當前進程進程號. print(os.getpid())
Join
加了join 將先執行子進程 再執行主進程。
from multiprocessing import Process import time import os # 傳值給子進程 def func(args, kwargs): print(args) print(kwargs) if __name__ == '__main__': # args = 傳入的參數 p = Process(target=func, args=("我的乖乖", "我太難了")) # 主進程 # 啟動一個子進程. 操作系統創建新進程執行新進程中的代碼. # 感知一個子程序的結束,將異步程序改為同步. p.start() # 子進程 p.join() print('先執行子進程 再執行主進程')
執行多個子進程(兩種方法)
1.基於函數
from multiprocessing import Process import time import os # 傳值給子進程 def func(args): print('#' * args) if __name__ == '__main__': # 啟動多個子進程 re = [] for i in range(20): p = Process(target=func, args=(i,)) # 主進程 tt = re.append(p) # 1.啟動一個子進程. 操作系統創建新進程執行新進程中的代碼. # 2.感知一個子程序的結束,將異步程序改為同步. p.start() # 主進程 # join = 先執行子進程 再執行主進程 p.join() print('先執行子進程 再執行主進程')
2.基於類
from multiprocessing import Process import time import os # 子進程 class Myprocess(Process): # 添加init屬性 def __init__(self, arg1, arg2): super().__init__() self.arg1 = arg1 self.arg2 = arg2 # 必須執行一個run方法 def run(self): # 查看當前進程的進程號 print(self.pid) # 查看當前進程的名稱 print(self.name) print(self.arg1) print(self.arg2) if __name__ == '__main__': # 啟動多個子進程 for i in range(20): p = Myprocess('這是一個好的開始', '代碼改變世界') # 1.啟動一個子進程. 操作系統創建新進程執行新進程中的代碼. # 2.感知一個子程序的結束,將異步程序改為同步. p.start() p = Myprocess('good idea', '我想你了') p.start() # 主進程 # join = 先執行子進程 再執行主進程 p.join() print('先執行子進程 再執行主進程')
進程與進程之間的變量問題
from multiprocessing import Process import time import os # 子進程 class Myprocess(Process): # 添加init屬性 def __init__(self, arg1, arg2): super().__init__() self.arg1 = arg1 self.arg2 = arg2 # 必須執行一個run方法 def run(self): global n n = 0 if __name__ == '__main__': # 啟動多個子進程 for i in range(20): p = Myprocess('這是一個好的開始', '代碼改變世界') # 1.啟動一個子進程. 操作系統創建新進程執行新進程中的代碼. # 2.感知一個子程序的結束,將異步程序改為同步. p.start() # 主進程 # join = 先執行子進程 再執行主進程 p.join() print(n) # PS:在每個進程中定義的變量,只能在本進程中使用
進程之間實現聊天
服務端
# 進程之間實現聊天 import socket from multiprocessing import Process # 子進程 def server(conn): # 接受數據 ret = conn.recv(1024).decode('utf-8') print(ret) conn.send(b'Hello') if __name__ == '__main__': # 創建一個socket sk = socket.socket() # 創建一個域名和端口 sk.bind(('127.0.0.1', 8070)) # 監聽客戶端的連接 sk.listen() # 接受客戶端的數據 conn, addr = sk.accept() # while 1: # 這個循環沒有一直在啟動進程,因為socket會亢住等待客戶端連接 p = Process(target=server, args=(conn,)) p.start()
客戶端
import socket from multiprocessing import Process def client(sk, msg): sk.send(bytes(msg, encoding='utf-8')) ret = sk.recv(1024) print(ret) if __name__ == '__main__': sk = socket.socket() sk.connect(('127.0.0.1', 8070)) # while 1: msg = input() p = Process(target=client, args=(sk, msg)) p.start()
守護進程
p.terminate() :在主程序內結束一個子進程。 p.is_alive() :檢驗一個進程是否還活着的狀態。 p.name)() :這個進程的名字。 p.pid() :這個進程的進程號。
from multiprocessing import Process import time def fun1(): while 1: # 給主進程 反饋信息,證明自己在運行。 time.sleep(0.5) print('我還活着呢') if __name__ == '__main__': p = Process(target=fun1) # 設置子進程為守護進程 p.daemon = True p.start() # 結束一個子進程 p.terminate() i = 0 while i < 10 : print('我是主進程') time.sleep(1) # 檢驗一個主進程 是否還活着 i = i + 1 # 守護進程會隨着主進程的代碼執行完畢而結束
LOCK鎖
Lock:一次只能執行一個子程序,而且只能等執行完之后才能執行下一個。
不加鎖會造成數據不安全的操作。
import json from multiprocessing import Process from multiprocessing import Lock import time # 修改數據庫必須加鎖 def show(i): with open('ticket') as f: str = f.read() obj = json.loads(str) print('%s號查看了 余票: %s' % (i, obj['ticket'])) def buy_ticket(i, lock): # 拿鑰匙進門 lock.acquire() with open('ticket') as f: str = f.read() obj = json.loads(str) time.sleep(0.1) if obj['ticket'] > 0: obj['ticket'] -= 1 print('\033[34m%s 買到票了 \033[0m ' % i) else: print('\033[34m%s 沒買到票 \033[0m' % i) with open('ticket', 'w') as f1: f1.write(json.dumps(obj)) # 還鑰匙 lock.release() if __name__ == '__main__': for i in range(10): p = Process(target=show, args=(i,)) p.start() lock = Lock() for i in range(10): p = Process(target=buy_ticket, args=(i, lock)) p.start()
信號量(Semaphore)
Semaphore: 用鎖的原理實現的,內置了一個記數器。在同一時間只能有指定數量的進程執行某一段被控制住的代碼。
# 限定進程訪問次數,指定一次進2個人 等他們出來后其他人才能進去
Sem.acquire(): 獲取鑰匙。
Sem.release(): 還鑰匙。
import time from multiprocessing import Semaphore, Process import random def ktv(i, sem): # 獲取鑰匙 sem.acquire() print('%s 走進ktv' % i) time.sleep(random.randint(1, 5)) print('%s走出ktv' % i) # 歸還鑰匙 sem.release() if __name__ == '__main__': # 限定進程訪問次數,指定一次進2個人 等他們出來后其他人才能進去 sem = Semaphore(2) for i in range(20): p = Process(target=ktv, args=(i, sem)) p.start()
事件(Event)
Set 和 cleat
分別用來修改一個事件的狀態 Ture或者 False
Is__set
用來查看一個事件的狀態
Wait
是依據事件的狀態來決定自己是否阻塞 False阻塞 True 不阻塞
# 事件(Event) from multiprocessing import Event # 一個信號可以使所有的進程都進入阻塞狀態 # 也可以控制所有的進程解除阻塞 # 一個世界被創建之后,默認是阻塞狀態 # 創建一個事件 e = Event() # 查看一個事件的狀態,默認被設置成阻塞false print(e.is_set()) # 將這個事件的狀態改為Ture e.set() # 是依據 e.is_set() 的值 來決定是否阻塞 e.wait() # 查看一個事件的狀態,默認被設置成阻塞false print(e.is_set()) # 將這個事件的狀態改為False print(e.clear()) # 是依據 e.is_set() 的值 來決定是否阻塞 e.wait()
紅綠燈效應
from multiprocessing import Process, Event import time import random def car(i, e): if e.is_set(): print('車%s在等待' % i) e.wait() else: print('\033[33m車%s通過了\033[0m'%i) def lint(e): while 1: # 判斷這個事件是否為Ture if e.is_set(): print('\033[31m 紅燈亮了 \033[0m') time.sleep(2) # 將這個事件的狀態改為False e.clear() else: print('\033[32m 綠燈亮了 \033[0m') time.sleep(2) # 將這個事件的狀態改為Ture e.set() if __name__ == '__main__': e = Event() p = Process(target=lint, args=(e,)) p.start() for i in range(20): p1 = Process(target=car, args=(i, e)) p1.start() time.sleep(random.randint(1, 2))
進程間的通信 ----隊列和管道
隊列
q = Queue(5) #創建共享的進程隊列,如果省略此參數,則無大小限制。
q.put(1) #將1放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止.
q.full() #用於判斷隊列是否已經滿了。
q.get() #返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。
q.get_nowait() #和get方法一樣 但是q如果為空的話會報錯。
q.empty() #用於判斷隊列是否已經為空。
Full empty #不完全准確。
from multiprocessing import Queue # 創建共享的進程隊列,如果省略此參數,則無大小限制. q = Queue(5) # 將1 放入隊列。 如果隊列已滿,此方法將阻塞至有空間可用為止 q.put(1) q.put(2) q.put(3) q.put(4) q.put(5) # 判斷隊列是否已經滿了。 print(q.full()) # 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # get_nowait 和get方法一樣不過 get_nowait 如果發現沒有取到值就會報錯 try: print(q.get_nowait()) except Exception: print('隊列已經空了') # 用於判斷隊列是否為空 print(q.empty())
子進程是與主進程之間通信的
from multiprocessing import Queue from multiprocessing import Process def func(e): # 放入隊列。如果隊列已經滿,此方法將阻塞至有空間可用為止 e.put('我是Mark I am Strong man') if __name__ == '__main__': e = Queue() q = Process(target=func,args=(e,)) q.start() # 在隊列中 獲取子進程放進來的值 print(e.get())
子進程是可以與子進程之間通信的
例子: 生產者消費者模型
# 生產者 def producer(name, food, q): for i in range(20): time.sleep(random.random()) f = "%s 制作了的第%s個%s" % (name, i, food) print(f) # 將數據放入隊列中 q.put(f) # 消費者 def chibaozi(name, q): while 1: # 在隊列中取值 food = q.get() # 不能用字符形式格式,需要用is關鍵字才能和none配合 if food is None: break print("%s 消費了 %s" % (name, food)) if __name__ == '__main__': # 創建一個隊列 q = Queue() # 生產者 qq = Process(target=producer, args=('Mark', '包子', q)) qq1 = Process(target=producer, args=('Riven', '饅頭', q)) # 消費之 qq2 = Process(target=chibaozi, args=('黃埔', q)) qq3 = Process(target=chibaozi, args=('佘義', q)) # 統一啟動子進程 qq.start() qq1.start() qq2.start() qq3.start() # 先執行子程序,后執行主程序代碼 qq.join() qq1.join() # 放入None 讓消費者跳出循環 q.put(None) q.put(None)
JoinableQueue
例 : 進階版(生產消費者模型)
q.join()
# 阻塞 直到一個隊列中的所有數據 全部被執行完畢。接受消費端發送過來的標記。
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。
阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
q.task_done()
內部執行了一個 count - 1的操作,發送信號給q.join使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。
from multiprocessing import JoinableQueue from multiprocessing import Process import random import time # 生產者 def producer(name, food, q): for i in range(20): time.sleep(random.random()) f = "%s 制作了的第%s個%s" % (name, i, food) print(f) # 將數據放入隊列中 q.put(f) # 阻塞 直到一個隊列中的所有數據 全部被執行完畢。接受消費端發送過來的標記. q.join() # 消費者 def chibaozi(name, q): while 1: # 在隊列中取值 food = q.get() print("%s 消費了 %s" % (name, food)) time.sleep(random.randint(1,3)) # 在消費者這一端:每次獲取一個數據 處理一個數據. 發送一個記號:標志一個數據被處理成功 # 內部執行了一個 count - 1 的操作 q.task_done() if __name__ == '__main__': # 創建一個隊列 q = JoinableQueue() # 生產者 qq = Process(target=producer, args=('Mark', '包子', q)) qq1 = Process(target=producer, args=('Riven', '饅頭', q)) # 消費者 qq2 = Process(target=chibaozi, args=('黃埔', q)) qq3 = Process(target=chibaozi, args=('佘義', q)) # 統一啟動子進程 qq.start() qq1.start() # 設置 消費者 為守護進程 主進程中的代碼執行完畢之后,子進程自動結束 qq2.daemon =True qq3.daemon =True qq2.start() qq3.start() # 感知一個子程序的結束 qq.join() qq1.join()
文字 總結:
#在生產者這一端:
#每一次生產一個數據
#且每一次生產的數據都放這隊列中
#在隊列中刻上一個記號
#當生產者全部生產完畢之后
#join 信號:已經停止生產數據了
#且要等待之前被刻上的記號都被消費完
#當數據都被處理完時,join阻塞結束
#消費端 中把所有的任務消耗完
#生產端 中的join感知到,停止阻塞
#所有 生產端 進程結束
#主進程中的p.join結束
#主進程中代碼結束
#守護進程(消費者進程)結束.
在消費着者這一端:
#每次獲取一個數據
#處理一個數據
#發送一個記號:標志一個數據被處理成功
管道
數據不安全性
#IPC。
#加鎖來控制操作管道的行為 來避免進程之間爭搶數據造成的數據不安全現象。
#隊列 進程之間數據安全。
#管道 + 鎖。
例1、
from multiprocessing import Pipe # 接受2個地址 conn1, conn2 = Pipe() # conn1 是發送端, conn1.send('123456') # conn2 是接受端 print(conn2.recv())
例2、
from multiprocessing import Pipe from multiprocessing import Process def func(conn1): # 發送消息 conn1.send('大傻我愛你') if __name__ == "__main__": # 接受2個端口 conn1, conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() # 接受消息 print(conn2.recv())
例3、通過條件判斷關閉管道進程
from multiprocessing import Pipe from multiprocessing import Process def func(conn2): while 1: # 接收端 ret = conn2.recv() print(ret) # 通過判斷條件來關閉進程 if ret is None: break if __name__ == "__main__": # 接受2個端口 conn1, conn2 = Pipe() p = Process(target=func, args=(conn2,)) p.start() for i in range(20): # 發送端 conn1.send('吃了嗎?') conn1.send(None)
例4、使用close關閉進程的方法
當最后一個端口沒關的時候就會報錯。我們捕獲錯誤信息進行操作就可以了。
from multiprocessing import Pipe from multiprocessing import Process def func(conn2, conn1): # 發送端關閉 conn1.close() while 1: # 管道只有一端沒有關閉就會報錯異常 我們又不能讓它每一次循環都關閉 只能try一下。 try: # 接受端 接受消息 ret = conn2.recv() print(ret) except EOFError: # 接受端關閉 conn2.close() break if __name__ == "__main__": # 接受2個端口 conn1, conn2 = Pipe() p = Process(target=func, args=(conn2, conn1)) p.start() # 主進程接受端關閉 conn2.close() for i in range(20): # 發送端 conn1.send('吃了嗎?') # 主進程 發送端關閉 conn1.close()
基於 管道的生產者消費者模型
from multiprocessing import Pipe from multiprocessing import Process from multiprocessing import Lock import time import random def func(name, food, conn1, conn2): # 關閉接受端 conn2.close() for i in range(20): ret = "\033[31m %s制作了%s個%s\033[0" % (name, i, food) print(ret) # 發送數據到管道 conn1.send(ret) time.sleep(random.randint(1, 2)) # 關閉發送端 conn1.close() def chi(name, conn1, conn2, lock): # 把發送端關閉 conn1.close() while 1: try: #加鎖 lock.acquire() ret1 = conn2.recv() print("\033[32m %s吃了%s \033[0m" % (name, ret1)) #加鎖 lock.release() except EOFError: conn2.close() break if __name__ == "__main__": # 接受2個端口 conn1, conn2 = Pipe() # 創建鎖 lock = Lock() p = Process(target=func, args=('Mark', '包子', conn1, conn2,)) p.start() p2 = Process(target=chi, args=('黃埔', conn1, conn2, lock,)) p2.start() # 記住一定要 關閉主進程 conn1.close() conn2.close()
進程之間的數據共享(Manager)
例1、
from multiprocessing import Manager, Process def main(dic): dic['count'] -=1 print(dic) if __name__ == "__main__": # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 m = Manager() # 放入一個字典 dic = m.dict({"count": 100}) p_lst = [] p = Process(target=main,args=(dic,)) p.start() # 感知一個子程序的結束 p.join() print('主進程',dic)
例2、會出現數據不安全
from multiprocessing import Manager, Process def main(dic): dic['count'] -= 1 print('子進程', dic) if __name__ == "__main__": # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 m = Manager() # 放入一個字典 dic = m.dict({"count": 100}) for i in range(20): p = Process(target=main, args=(dic,)) p.start() p.join() print('測試', dic)
數據不安全性可能會出現一個進程同時用一個數據
解決數據不安全問題(加鎖)
from multiprocessing import Manager from multiprocessing import Process from multiprocessing import Lock def main(dic,lock): # 加鎖 lock.acquire() dic['count'] -= 1 print('子進程', dic) lock.release() if __name__ == "__main__": # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 m = Manager() lock = Lock() # 放入一個字典 dic = m.dict({"count": 100}) for i in range(20): p = Process(target=main, args=(dic,lock)) p.start() p.join() print('測試', dic)
進程池(Pool)
為什么要有進程池?進程池的概念。
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。
那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。
第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。
因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?
在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,
等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,
拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。
# 效率。
# 每開啟進程,開啟屬於這個進程的內存空間。
# 寄存器 堆棧 文件。
進程池的數量是 cpu個數+1
# 進程過多 操作系統的調度。
# 更高級的進程: 在忙的時候可以 20+ ,在不忙的時候自動降為3個左右。
例1、起一個進程池
from multiprocessing import Pool from multiprocessing import Process def func(n): for i in range(10): print(n + 1) if __name__ == "__main__": # 開啟了5個進程 pool = Pool(5)
進程池的同步調用(apply):(一般不用)
對比:
1. 正常情況下先執行5個start 后執行5個end
from multiprocessing import Pool from multiprocessing import Process import time import os def func(n): print("子進程開始: %s"%n, os.getpid()) time.sleep(1) print("子進程結束: %s" % n, os.getpid()) if __name__ == "__main__": # 開啟了5個進程 pool = Pool(5) for i in range(10): # 正常情況下先執行5個start 后執行5個end p = Process(target=func,args=(i,)) p.start()
子進程開始: 1 13304 子進程開始: 3 13276 子進程開始: 2 12304 子進程開始: 4 12384 子進程開始: 5 12380 子進程開始: 9 6200 子進程開始: 0 13288 子進程開始: 7 4288 子進程開始: 8 13244 子進程開始: 6 7688 子進程結束: 1 13304 子進程結束: 3 13276 子進程結束: 2 12304 子進程結束: 4 12384 子進程結束: 5 12380 子進程結束: 9 6200 子進程結束: 0 13288 子進程結束: 7 4288 子進程結束: 8 13244 子進程結束: 6 7688
2.使用apply(# 在使用了 apply后start和end變成了同步)
from multiprocessing import Pool from multiprocessing import Process import time import os def func(n): print("子進程開始: %s"%n, os.getpid()) time.sleep(1) print("子進程結束: %s" % n, os.getpid()) return n if __name__ == "__main__": # 開啟了5個進程(進程池中的進程 永遠都是活着的) pool = Pool(5) for i in range(10): # 正常情況下先執行5個start 后執行5個end # p = Process(target=func,args=(i,)) # 在使用了 apply后start和end變成了同步 p = pool.apply(func,args=(i,)) # 獲取返回值 print(p)
進程池的異步調用(用的比較多)
# 異步的apply_async用法:
主進程需要使用 jion,
等待進程池內任務都處理完,然后可以用get收集結果
否則, 主進程結束,進程池可能還沒來得及執行,也就跟着結束了.
返回值:為了能使用返回值需要使用 obj.get()方法
# 使用get來獲取apply_aync 的結果,如果是apply,則沒有get方法
# 因為apply是同步執行,立刻獲取結果,也根本無需get
from multiprocessing import Pool from multiprocessing import Process import time import os def func(n): print("子進程開始: %s"%n, os.getpid()) time.sleep(10) print("子進程結束: %s" % n, os.getpid()) return n*10 if __name__ == "__main__": # 開啟了5個進程(進程池中的進程 永遠都是活着的) pool = Pool(5) lis = [] for i in range(10): """ # 異步的apply_async用法: 如果使用異步提交的任務 主進程需要使用 jion,等待進程池內任務都處理完,然后可以用get收集結果 否則, 主進程結束,進程池可能還沒來得及執行,也就跟着結束了 """ ret = pool.apply_async(func,args=(i,)) lis.append(ret) # 使用get來獲取apply_aync 的結果,如果是apply,則沒有get方法 # 因為apply是同步執行,立刻獲取結果,也根本無需get for li in lis: print(li.get()) # 結束進程池接受任務 pool.close() # 感知進程池中的任務執行結束 pool.join()
進程池的socket
進程池中的回調函數
回調函數與爬蟲的應用