python程序中的進程操作


  之前我們已經了解了很多進程相關的理論知識,了解進程是什么應該不再困難了,剛剛我們已經了解了,運行中的程序就是一個進程。所有的進程都是通過它的父進程來創建的。因此,運行起來的python程序也是一個進程,那么我們也可以在程序中再創建進程。多個進程可以實現並發效果,也就是說,當我們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以我們之前所學的知識,並不能實現創建進程這個功能,所以我們就需要借助python中強大的模塊。

multiprocess模塊

      仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自m ultiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由於提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共享。

一、multiprocess.process模塊

process模塊介紹

process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: 1 group參數未使用,值始終為None 2 target表示調用對象,即子進程要執行的任務 3 args表示調用對象的位置參數元組,args=(1,2,'egon',) 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 5 name為子進程的名稱

方法介紹

1 p.start():啟動進程,並調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖 4 p.is_alive():如果p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

屬性介紹

1 p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

在windows中使用process模塊的注意事項

在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。

windows操作系統下創建進程

# encoding=utf-8 # auther:lsj # 進程的第一種方式
from multiprocessing import Process import time def task(name): print('%s is running'%name) time.sleep(3) print('%s is over' %name) # windows系統下
if __name__ == '__main__': # 創建進程的代碼
    # 1、創建一個對象
    p = Process(target=task,args=('lsj',)) # 2、開啟進程
    p.start() # 告訴操作系統幫你創建一個進程:異步展示
    print('')
# encoding=utf-8 # auther:lsj # 進程的第二種方式 類的繼承
from multiprocessing import Process import time class MyProcess(Process): def run(self): print('hello bf girl') time.sleep(1) print('get out!!') if __name__ == '__main__': p = MyProcess() p.start() print('')

總結:

  創建進程就是在內存中申請一塊內存空間將需要運行的代碼丟進去。

  一個進程對應在內存中就是一塊獨立的內存空間。

  多個進程對應在內存中就是多塊獨立的內存空間。

  進程與進程之間數據默認情況下是無法直接交互,如過想交互可以借助第三方工具或者模塊

二、使用process模塊創建進程

在一個python進程中開啟子進程,start方法和並發效果。

import time from multiprocessing import Process def f(name): print('hello', name) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('lsj',)) p.start() time.sleep(1) print('執行主進程的內容了') 
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() #p.join() # 主進程等待子進程p運行結束之后再繼續往后執行
    print('我是父進程')
import os from multiprocessing import Process def f(x): print('子進程id :',os.getpid(),'父進程id :',os.getppid()) return x*x if __name__ == '__main__': print('主進程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()

進階,多個進程同時運行(注意,子進程的執行順序不是根據啟動順序決定的)

import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst]
    print('父進程在執行')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) # [p.join() for p in p_lst]
    print('父進程在執行')

除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式

import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() #start會自動調用run
p2.start() # p2.run()
p3.start() p1.join() p2.join() p3.join() print('主線程')

進程之間的數據隔離問題(默認情況下進程間數據是相互隔離的)

from multiprocessing import Process def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進程內: ',n)

三、進程對象

  一台計算機上面運行着很多進程,那么計算機時如何區分並管理這些進程服務端的呢?

  計算機會給每一個運行的進程分配一個PID號(pid號也叫做進程號)

如何查看pid?

  1、windows電腦

    進入cmd輸入tasklist命令,即可查看所有的服務以及對應的pid號。

    tasklist | findstr PID查看具體的進程

  2、mac電腦

    進入終端之后輸入ps aux命令。

    ps aux | grep PID查看具體的進程。

current_process:查看當前進程。
current_process().pid:查看當前進程號。
os.getpid():查看當前進程號
os.getppid():查看當前進程的父進程號
# encoding=utf-8 # auther:lsj # 進程對象

from multiprocessing import Process,current_process import time def task(): print('%s is running'%current_process().pid)  # 查看當前進程的進程號
    time.sleep(30) if __name__ == '__main__': p = Process(target=task,args=()) p.start() # print('',current_process().pid)
   print('主',current_process().pid) 運行結果: D:\Python38\python.exe D:
/day039/day039_03進程對象及其他方法.py 主 864 13760 is running

p.terminate()  # 殺死當前進程

is_alive()  # 判斷當前進程是否存活

一般情況下,我們會默認將存儲bool值的變量名 和 返回的結果是bool值的方法名都起成'is_'開頭

# encoding=utf-8 # auther:lsj # 進程對象

from multiprocessing import Process,current_process import time import os def task(): print('%s is running'%os.getpid())  # 查看當前進程的進程號
    time.sleep(3) if __name__ == '__main__': p = Process(target=task,args=()) p.start() p.terminate() # 殺死當前進程,告訴操作系統幫你去殺死當前進程,需要一定的時間,而代碼的運行速度極快,所以后面要給點時間
    time.sleep(0.1)  # 如果不加時間,最后的運行結果是True
    print(p.is_alive())  # 判斷當前進程是否存活
    print('') 運行結果: D:\Python38\python.exe D:/day039/day039_03_01進程對象及其他方法.py False 主

四、僵屍進程(了解)

  僵屍進程:

    什么是僵屍:死了但沒有死透。

    僵屍進程:當你開設了子進程之后,該進程死后不會立即釋放占用的進程號。因為我要讓父進程能夠查看到它開設的子進程的一些基本的信息,占用的pid號 運行時間

      所有的進程都會步入僵屍進程。

      父進程不死並且無限制的創建子進程並且子進程也不結束

      父進程回收子進程占用的pid號的方式:

        (1)父進程等待子進程運行結束

        (2)父進程調用join方法

五、孤兒進程(了解)

  孤兒進程:子進程存活,父進程意外死亡。(這時候沒有人回收子進程的pid號)

      操作系統會開設一個回收機制(該機制類似於‘兒童福利院’)專門管理孤兒進程回收相關資源。

六、守護進程

  什么是守護進程:默認啟動的服務隨着操作系統是啟動而啟動,隨着操作系統的關閉而關閉。(古代君王死后陪葬)

  守護進程的作用:會隨着主進程的結束而結束。

主進程創建守護進程

  其一:守護進程會在主進程代碼執行結束后就終止

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止。

# encoding=utf-8 # auther:lsj # 守護進程
from multiprocessing import Process import time def task(name): # def task(name='li'):
    print('%s總管正常活着'%name) time.sleep(3) print('%s總管正常死亡'%name) if __name__ == '__main__': p = Process(target=task,args=('li',)) # p = Process(target=task,kwargs={'name':'li'})
    p.daemon = True # 將進程p設置成守護進程,一定放在start()方法上面才有效否則報錯:AssertionError: process has already started(該進程已經啟動了)
 p.start() print('皇帝王壽終正寢') 運行結果: D:\Python38\python.exe D:/pycharm/oldboy_29/day039/day039_05守護進程.py 皇帝王壽終正寢

 

import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
p.start() time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
print('')
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止.

七、socket聊天並發實例

from socket import *
from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break

if __name__ == '__main__': #windows下start進程一定要寫到這下面
    while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

八、多進程中的其他方法

from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True

print('開始') print(p1.is_alive()) #結果為False
1 class Myprocess(Process): 2     def __init__(self,person): 3         self.name=person   # name屬性是Process中的屬性,標示進程的名字
 4         super().__init__() # 執行父類的初始化方法會覆蓋name屬性
 5         #self.name = person # 在這里設置就可以修改進程名字了
 6         #self.person = person #如果不想覆蓋進程名,就修改屬性名稱就可以了
 7     def run(self): 8         print('%s正在和網紅臉聊天' %self.name) 9         # print('%s正在和網紅臉聊天' %self.person)
10         time.sleep(random.randrange(1,5)) 11         print('%s正在和網紅臉聊天' %self.name) 12         # print('%s正在和網紅臉聊天' %self.person)
13 
14 
15 p1=Myprocess('哪吒') 16 p1.start() 17 print(p1.pid)    #可以查看子進程的進程id

九、互斥鎖

  場景模擬:我們使用12306購票,甲10:10向服務端發送請求,服務端傳給甲的狀態是服務器10:10的狀態,還有10張票。於此同時乙、丙、丁等人也向服務端發送請求,並都購買了票。此時的甲看不到服務器修改后的數據,當甲10:30再次訪問服務器的時候,甲才能看到服務器狀態已經改變了。

  

互斥鎖模擬搶票功能
# encoding=utf-8 # auther:lsj # 互斥鎖模擬搶票功能
from multiprocessing import Process import json import time import random # 查票
def search(i): # 文件操作讀取票數
    with open('data',mode='r',encoding='utf-8') as f: dic = json.load(f) print('用戶%s查詢余票:%s'%(i,dic.get('ticket_num'))) # 字典取值不要用[]的形式,推薦使用get方法,不會出現報錯

# 買票步驟:1)先查。2)再買
def buy(i): # 先查票
    with open('data',mode='r',encoding='utf-8') as f: dic = json.load(f) # 模擬網絡延遲
    time.sleep(random.randint(1,3)) # 判斷當前是否有票
    if dic.get('ticket_num') > 0: # 修改數據庫 買票
        dic['ticket_num'] -= 1
        # 寫入數據庫
        with open('data','w',encoding='utf8') as f: json.dump(dic,f) # 字典寫入文件內
        print('用戶%s買票成功'%i) else: print('用戶%s買票失敗'%i) # 整合上面兩個函數的功能
def run(i): search(i) buy(i) if __name__ == '__main__': for i in range(1,11):  # 打開多個進程10個i
        p = Process(target=run,args=(i,)) p.start()

data文件,票數為1張

{"ticket_num": 1}

運行結果如下,就一張票,但是每個人都強到票了,這就有問題了。

D:\Python38\python.exe D:/pycharm/oldboy_29/day039/day039_06互斥鎖模擬搶票功能.py 用戶1查詢余票:1 用戶3查詢余票:1 用戶2查詢余票:1 用戶4查詢余票:1 用戶5查詢余票:1 用戶6查詢余票:1 用戶7查詢余票:1 用戶8查詢余票:1 用戶9查詢余票:1 用戶10查詢余票:1 用戶1買票成功 用戶7買票成功 用戶10買票成功 用戶3買票成功 用戶5買票成功 用戶8買票成功 用戶9買票成功 用戶2買票成功 用戶4買票成功 用戶6買票成功 Process finished with exit code 0

 

結論:多個進程操作同一份數據的時候,會出現數據錯亂的問題。針對該問題的解決方式,就是枷鎖處理,將並發變成串行,犧牲效率但是保證了數據的安全。

加入鎖:

# encoding=utf-8 # auther:lsj # 互斥鎖模擬搶票功能完整版
from multiprocessing import Process,Lock import json import time import random # 查票
def search(i): # 文件操作讀取票數
    with open('data',mode='r',encoding='utf-8') as f: dic = json.load(f) print('用戶%s查詢余票:%s'%(i,dic.get('ticket_num'))) # 字典取值不要用[]的形式,推薦使用get方法,不會出現報錯

# 買票步驟:1)先查。2)再買
def buy(i): # 先查票
    with open('data',mode='r',encoding='utf-8') as f: dic = json.load(f) # 模擬網絡延遲
    time.sleep(random.randint(1,3)) # 判斷當前是否有票
    if dic.get('ticket_num') > 0: # 修改數據庫 買票
        dic['ticket_num'] -= 1
        # 寫入數據庫
        with open('data','w',encoding='utf8') as f: json.dump(dic,f) # 字典寫入文件內
        print('用戶%s買票成功'%i) else: print('用戶%s買票失敗'%i) # 整合上面兩個函數的功能
def run(i,mutex): search(i) # 給買票環節加鎖處理
    # 搶鎖
 mutex.acquire() buy(i) # 釋放鎖
 mutex.release() if __name__ == '__main__': # 在主進程中生成一把鎖,讓所有的子進程搶,誰先搶到誰先買票
    mutex = Lock() for i in range(1,11):  # 打開多個進程10個i
        p = Process(target=run,args=(i,mutex,)) p.start() 運行結果: D:\Python38\python.exe D:/pycharm/oldboy_29/day039/day039_06互斥鎖模擬搶票功能改進版.py 用戶2查詢余票:1 用戶1查詢余票:1 用戶3查詢余票:1 用戶4查詢余票:1 用戶7查詢余票:1 用戶5查詢余票:1 用戶6查詢余票:1 用戶8查詢余票:1 用戶9查詢余票:1 用戶10查詢余票:1 用戶2買票成功 用戶1買票失敗 用戶3買票失敗 用戶4買票失敗 用戶7買票失敗 用戶5買票失敗 用戶6買票失敗 用戶8買票失敗 用戶9買票失敗 用戶10買票失敗 Process finished with exit code 0

 

十、進程同步(multiprocess.Lock) 鎖 —— multiprocess.Lock

      通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管並發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。

  當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start()
# 由並發變成了串行,犧牲了運行效率,但避免了競爭
import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start()

  上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

  接下來,我們以模擬搶票為例,來看看數據安全的重要性。

#文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task) p.start()

 

#文件db的內容為:{"count":5} #注意一定要用雙引號,不然json無法識別 #並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網絡延遲
    if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task,args=(lock,)) p.start()

 

#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.需要自己加鎖處理 #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

 擴展:

  行鎖:對一行數據操作時其他人等待。

  表鎖:對一個表數據操作時其他人等待。

注意:

  1、鎖不要輕易使用,容易造成死鎖現象(我們寫代碼一般不會用到,都是內部封裝好的,因為比較復雜)。

  2、鎖只在處理數據的部分加來保證數安全(只在爭搶數據的環節加鎖處理即可)。

 

十一、進程間通信——隊列(multiprocess.Queue)

  隊列:先進先出 (隊列=管道+鎖)

  堆棧:先進后出

  管道:subprocess 

      stdin stdout stderr

概念介紹

創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

Queue([maxsize]) 
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
Queue([maxsize]) 
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
 
q.empty() 
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。

q.join_thread() 
連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。

注意:

q.full()
q.empty()
q.get_nowait()
在多進程的情況下時不精確的

代碼實例

''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,但是隊列接口 '''

from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty
q.put(3) q.put(3) q.put(3) # q.put(3) # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。
           # 如果隊列中的數據一直不被取走,程序就會永遠停在這里。
try: q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
    print('隊列已經滿了') # 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。
try: q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了') print(q.empty()) #空了

 

上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。

import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello'])  #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。

if __name__ == '__main__': q = Queue() #創建一個Queue對象
    p = Process(target=f, args=(q,)) #創建一個進程
 p.start() print(q.get()) p.join()

 

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:

import os import time import multiprocessing # 向queue中輸入數據的函數
def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數
def outputQ(queue): info = queue.get() print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info)) # Main
if __name__ == '__main__': multiprocessing.freeze_support() record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3) # 輸入進程
    for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 輸出進程
    for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()

 

十二、進程間通信IPC

  IPC(Inter-Process Communication)

引子:簡單的這么立即,甲乙兩個進程找個第三方丙(消息隊列),把數據先傳給第三方,另外一方通過第三方來存取數據。

研究思路:
1、主進程跟子進程借助於隊列通信。
2、子進程跟子進程借助於隊列通信。
# encoding=utf-8
# auther:lsj
# IPC 機制:1、主進程跟子進程借助於隊列通信
from multiprocessing import Queue,Process

def producer(q):
    q.put('老板娘你好!!!')
    print('hello big baby!')

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    p.start()
    print(q.get())

運行結果:
D:\Python38\python.exe D:/pycharm/oldboy_29/day039/day039_08IPC機制.py
hello big baby!
老板娘你好!!!

 

# encoding=utf-8
# auther:lsj
# IPC 機制:2、子進程跟子進程借助於隊列通信
from multiprocessing import Queue,Process

def producer(q):   # 生產者
    q.put('老板娘你好!!!')
    print('hello big baby!')

def consumer():  # 消費者
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    p1 = Process(target=consumer,args=(q,))
    p.start()
    print(q.get())

運行結果:
D:\Python38\python.exe D:/pycharm/oldboy_29/day039/day039_08_01IPC機制.py
hello big baby!
老板娘你好!!!

 

十三、生產者消費者模型

引子:

生產者:生產/制造東西的
消費者:消費/處理東西的
該模型除了上述兩個之外還需要一個東西

生活中的例子:
    做包子的將包子做好后放進蒸籠(媒介)里面,
    買包子的去蒸籠(媒介)里去拿取。

    廚師做菜做完后用盤子(媒介)裝着給消費者端過去。

    生產者與消費者之間不是直接做交互的,而是借助媒介(隊列)做交互

生產者/消費者模型(分三部分):
    生產者(做包子的) + 消息隊列(蒸籠) + 消費者(吃包子的)

 

  在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

  在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

  生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們
    p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) #開始
 p1.start() c1.start() print('')

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #發送結束信號
if __name__ == '__main__': q=Queue() #生產者們:即廚師們
    p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) #開始
 p1.start() c1.start() print('')

注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們
    p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) #開始
 p1.start() c1.start() p1.join() q.put(None) #發送結束信號
    print('')

但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始
 p1.start() p2.start() p3.start() c1.start() p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號
 p2.join() p3.join() q.put(None) #有幾個消費者就應該發送幾次結束信號None
    q.put(None) #發送結束信號
    print('')

JoinableQueue([maxsize]) 
  創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
每當往該隊列中存入數據的時候,內部會有一個計數器+1,每當調用task_done()時候計數器-1 q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。

q.join() 當計數器為0的時候才往后運行
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了

def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() #生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。


if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始
    p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('') #主進程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
    #因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。

十四、進程之間的數據共享

  展望未來,基於消息傳遞的並發編程是大勢所趨

  即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。

  這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中。

  但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。

以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。

進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)

 

進程池和multiprocess.Pool模塊

進程池

為什么要有進程池?進程池的概念。

在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?

在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。

multiprocess.Pool模塊

概念介紹

Pool([numprocess  [,initializer [, initargs]]]):創建進程池

 

 參數介紹

1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組
1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 2 '''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()'''
3 
4 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''
6    
7 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 8 
9 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法 2 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。 3 obj.ready():如果調用完成,返回True 4 obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 5 obj.wait([timeout]):等待結果變為可用。 6 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

代碼實例

進程池和多進程效率對比

p.map進程池和進程效率測試

同步和異步
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2

if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
                                    # 但不管該任務是否存在阻塞,同步調用都會在原地等着
    print(res_l)
import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2

if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務
    res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務
                                          # 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 
 res_l.append(res) # 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果
    # 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了
 p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

 

 練習

#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) #開啟6個客戶端,會發現2個客戶端處於等待狀態 #在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break

if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

 

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

發現:並發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.

回調函數
 需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

 

from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了

''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()

如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數

from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2
if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中所有進程執行完畢
 nums=[] for res in res_l: nums.append(res.get()) #拿到所有結果
    print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM