進程:
之前我們已經了解了操作系統中進程的概念,程序並不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現並發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的。
進程間通信
用Queue模塊:
IPC(Inter-Process Communication)
隊列
概念介紹
隊列是先進先出
必須put放進東西后 才能get來取值
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

Queue([maxsize])
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
隊列中的進程的內容是共享的 因為不同的進程的數據是隔離的 我們可以用隊列 讓他們之間的數據進行共享
在進程中使用隊列可以完成雙向通信
from multiprocessing import Process ,Queue q = Queue(10) try: q.get_nowwait() # 如果你用 nowwait的話你的獲取嗯u過沒有就不會阻塞就會報錯 except: print('queue.Empty') q.get() for i in range(10): q.get(i) print(q.qsize(10))
from multiprocessing import Process ,Queue q = Queue(10) # 創建一個可以存放10個值的隊列 # try: # q.get_nowwait() # except: # print('queue.Empty') # # q.get() for i in range(10): q.put(i) print(q.qsize()) # 獲取你的隊列可以存放的最大值 print(q.full()) # 判定是不是滿了 返回的值布爾值 # q.put(111) # 給這個隊列放進值 # print(q.grt()) print('*'*10) print(q.empty()) # 判定隊列是不是為空 返回的也是布爾值
生產者消費者模型
解決數據供需不平衡的情況
隊列是進程安全的 內置了鎖來保證隊列中的每一個數據都不會被多個進程重復取

import time import random from multiprocessing import Process,Queue 生產者消費者模型 解決數據供需不平衡的情況 隊列是進程安全的 內置了鎖來保證隊列中的每一個數據都不會被多個進程重復取 def consumer(q,name): while True: food = q.get() if food == 'done':break time.sleep(random.random()) print('%s吃了%s'%(name,food)) def producer(q,name,food): for i in range(10): time.sleep(random.random()) print('%s生產了%s%s'%(name,food,i)) q.put('%s%s'%(food,i)) if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=[q,'Egon','泔水']) p2 = Process(target=producer,args=[q,'Yuan','骨頭魚刺']) p1.start() p2.start() Process(target=consumer,args=[q,'alex']).start() Process(target=consumer,args=[q,'wusir']).start() p1.join() p2.join() q.put('done') q.put('done')

import time import random from multiprocessing import Process,JoinableQueue def consumer(q,name): while True: food = q.get() time.sleep(random.random()) print('%s吃了%s'%(name,food)) q.task_done() def producer(q,name,food): for i in range(10): time.sleep(random.random()) print('%s生產了%s%s'%(name,food,i)) q.put('%s%s'%(food,i)) q.join() # 等到所有的數據都被taskdone才結束 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=[q,'Egon','泔水']) p2 = Process(target=producer,args=[q,'Yuan','骨頭魚刺']) p1.start() p2.start() c1 = Process(target=consumer,args=[q,'alex']) c2 = Process(target=consumer,args=[q,'wusir']) c1.daemon = True c2.daemon = True c1.start() c2.start() p1.join() p2.join() # producer # put # 生產完全部的數據就沒有其他工作了 # 在生產數據方 : 允許執行q.join # join會發起一個阻塞,直到所有當前隊列中的數據都被消費 # consumer # get 獲取到數據 # 處理數據 # q.task_done() 告訴q,剛剛從q獲取的數據已經處理完了 # consumer每完成一個任務就會給q發送一個taskdone # producer在所有的數據都生產完之后會執行q.join() # producer會等待consumer消費完數據才結束 # 主進程中對producer進程進行join # 主進程中的代碼會等待producer執行完才結束 # producer結束就意味着主進程代碼的結束 # consumer作為守護進程結束 # consumer中queue中的所有數據被消費 # producer join結束 # 主進程的代碼結束 # consumer結束 # 主進程結束
Queue([maxsize])
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。
JoinableQueue([maxsize])
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。

from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() #生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。 if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') #主進程等--->p1,p2,p3等---->c1,c2 #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據 #因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。
管道:
管道是雙向通信,數據進程不安全,隊列是管道加鎖來實現的

#創建管道的類: Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 #參數介紹: dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。 conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象 #其他方法: conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。

from multiprocessing import Process, Pipe def f(conn): conn.send("Hello The_Third_Wave") conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()
應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。
# 管道 # from multiprocessing import Pipe # left,right = Pipe() # left.send('1234') # print(right.recv()) # left.send('1234') # print(right.recv()) 管道的信息發送接收信息是不需要進行編碼轉碼的 from multiprocessing import Process, Pipe def f(parent_conn,child_conn): parent_conn.close() #不寫close將不會引發EOFError while True: try: print(child_conn.recv()) except EOFError: child_conn.close() break if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(parent_conn,child_conn,)) p.start() child_conn.close() parent_conn.send('hello') parent_conn.send('hello') parent_conn.send('hello') parent_conn.close() p.join()
進程之間的數據共享
展望未來,基於消息傳遞的並發編程是大勢所趨
即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。
這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中。
但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。
以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。

進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
from multiprocessing import Manager,Process,Lock def func(dic,lock): # lock.acquire() # dic['count'] = dic['count']-1 # lock.release() with lock: # 上下文管理 :必須有一個開始動作 和 一個結束動作的時候 dic['count'] = dic['count'] - 1 if __name__ == '__main__': m = Manager() lock = Lock() dic = m.dict({'count':100}) p_lst = [] for i in range(100): p = Process(target=func,args=[dic,lock]) p_lst.append(p) p.start() for p in p_lst:p.join() print(dic) # 同一台機器上 : Queue # 在不同台機器上 :消息中間件
進程池和multiprocess.Pool模塊
進程池
為什么要有進程池?進程池的概念。
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?
在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。
multiprocess.Pool模塊
Pool([numprocess [,initializer [, initargs]]]):創建進程池
概念介紹

1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組

1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 2 '''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()''' 3 4 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。''' 6 7 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 8 9 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用

1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法 2 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。 3 obj.ready():如果調用完成,返回True 4 obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 5 obj.wait([timeout]):等待結果變為可用。 6 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
一 進程池與線程池
在剛開始學多進程或多線程時,我們迫不及待地基於多進程或多線程實現並發的套接字通信,
然而這種實現方式的致命缺陷是:服務的開啟的進程數或線程數都會隨着並發的客戶端數目地增多而增多,
這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,於是我們必須對服務端開啟的進程數或線程數加以控制,
讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途,
例如進程池,就是用來存放進程的池子,本質還是基於多進程,只不過是對開啟進程的數目加上了限制
'''
同步和異步

import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞 # 但不管該任務是否存在阻塞,同步調用都會在原地等着 print(res_l)

import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行 # 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務 # 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 res_l.append(res) # 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果 # 否則,主進程結束,進程池可能還沒來得及執行,也就跟着一起結束了 p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
import time import random from multiprocessing import Pool def func(i): print('func%s' % i) time.sleep(random.randint(1,3)) return i**2 if __name__ == '__main__': p = Pool(5) ret_l = [] for i in range(15): # p.apply(func=func,args=(i,)) # 同步調用 ret = p.apply_async(func=func,args=(i,))# 異步調用 ret_l.append(ret) for ret in ret_l : print(ret.get()) # 主進程和所有的子進程異步了
回調函數:
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了 ''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
import os from urllib.request import urlopen from multiprocessing import Pool def get_url(url): print('-->',url,os.getpid()) ret = urlopen(url) content = ret.read() return url def call(url): # 分析 print(url,os.getpid()) if __name__ == '__main__': print(os.getpid()) l = [ 'http://www.baidu.com', # 5 'http://www.sina.com', 'http://www.sohu.com', 'http://www.sogou.com', 'http://www.qq.com', 'http://www.bilibili.com', #0.1 ] p = Pool(5) # count(cpu)+1 ret_l = [] for url in l: ret = p.apply_async(func = get_url,args=[url,],callback=call) ret_l.append(ret) for ret in ret_l : ret.get() # 回調函數 # 在進程池中,起了一個任務,這個任務對應的函數在執行完畢之后 # 的返回值會自動作為參數返回給回調函數 # 回調函數就根據返回值再進行相應的處理 # 回調函數 是在主進程執行的