python 進程 multiprocessing模塊


一、multiprocess.process模塊

1.process類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)
強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
 
參數介紹:
group參數未使用,值始終為None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'egon',)
kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
name為子進程的名稱
 
 
2.process類的方法
p.start():啟動進程,並調用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程  
 
 
3.process類的屬性
p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
 
 
4.在windows運行時要注意的問題
在Windows操作系統中由於沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。
 
 
5.實例代碼
1)創建一個子進程,並運行
from multiprocessing import Process
def func(name):
    print('子進程:你好,',name)
 
if __name__ == '__main__':
    p = Process(target=func,args=('hsr',))
    p.start()
 
2)查看子進程和父進程的進程號
from multiprocessing import Process
import os
def func():
    print('子進程PID:',os.getpid())
 
if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    print('父進程PID:',os.getpid())
 
3)join
from multiprocessing import Process
import time
def func(*args):
    print('*'*args[0])
    time.sleep(5)
    print('*' * args[1])
if __name__ == '__main__':
    p = Process(target=func,args=(10,20))
    p.start()
    p.join()    #主線程等待p終止
    print("-------運行完了-------")
 
4)開啟多個子進程
from multiprocessing import Process
import time
def func(no,*args):
    print(str(no)+"  :"+'*'*args[0])
    time.sleep(5)
    print(str(no)+"  :"+'*'*args[1])
if __name__ == '__main__':
    p_li = []
    for i in range(10):
        p_li.append(Process(target=func,args=(i,10,20)))
    for i in p_li:
        i.start()
 
    [i.join() for i in p_li]    #讓最后的print等子進程都結束了再執行
    print('運行完了')
 
 
5)實現多進程的另一種方法
#自定義類 繼承Process類
#必須實現run方法,run方法就是子進程執行的方法
#如果要參數,則實現自己的init方法,並在其中調用父類的init方法
from multiprocessing import Process
import os
class MyProcess(Process):
    def __init__(self,arg1):
        super().__init__()
        self.arg1 = arg1
    def run(self):
        print("My Process:",self.pid)
        print(self.arg1)
if __name__ == '__main__':
    print(os.getpid())
    p1 = MyProcess(4)
    p1.start()
 
 
6.進程間的數據隔離
#進程間不會共享數據
from multiprocessing import Process
import os
def func():
    global n
    n = 0
    print('pid:'+str(os.getpid())+"      "+str(n))
if __name__ == '__main__':
    n = 100
    p = Process(target=func)
    p.start()
    p.join()
    print('pid:'+str(os.getpid())+"      "+str(n))
 
 
7.守護進程
守護進程(daemon)是一類在后台運行的特殊進程,用於執行特定的系統任務。很多守護進程在系統引導的時候啟動,並且一直運行直到系統關閉。
會隨着主進程的結束而結束。
主進程創建守護進程
  其一:守護進程會在主進程代碼執行結束后就終止
  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
#守護進程
from multiprocessing import Process
import time
def func():
    while 1:
        time.sleep(2)
        print('Good')
if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True #設置子進程為守護進程
    p.start()
    i = 10
    while i>0:
        print('Do something')
        time.sleep(5)
        i -= 1

 

 

二、進程同步

1.進程鎖
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理
 
#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
 
#模擬吃50個人吃5個蘋果
#使用Lock對象的acquire請求鎖,release釋放鎖
from multiprocessing import Process
from multiprocessing import Lock
import json
def eat(no,lock):
    lock.acquire()
    with open('info.json') as f:
        dic = json.load(f)
        AppleNum = dic["Apple"]
        print("蘋果個數:" + str(AppleNum))
    if AppleNum >0:
        print("%d 吃了一個蘋果" %no)
        AppleNum -= 1
        dic["Apple"] = AppleNum
        with open('info.json','w') as f:
            json.dump(dic,f)
    else:
        print("%d 沒有蘋果吃了" %no)
    lock.release()
if __name__ == '__main__':
    lock = Lock()
    for i in range(50):
        Process(target=eat, args=(i,lock)).start()

 

 
 
2.信號量
信號量(Semaphore),有時被稱為信號燈,是在多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被並發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那么該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。
信號量(semaphore)的數據結構為一個值和一個指針,指針指向等待該信號量的下一個進程。信號量的值與相應資源的使用情況有關。當它的值大於0時,表示當前可用資源的數量;當它的值小於0時,其絕對值表示等待使用該資源的進程個數。注意,信號量的值僅能由PV操作來改變。
 
對於信號量,可以認為是一個倉庫,有兩個概念,容量和當前的貨物個數。
P操作從倉庫拿貨,如果倉庫中沒有貨,線程一直等待,直到V操作,往倉庫里添加了貨物,為了避免P操作一直等待下去,會有一個超時時間。
V操作往倉庫送貨,如果倉庫滿了,線程等待,直到有P操作,從倉庫中拿走貨物,有空的位置。
創建信號量,設置容量,先有V操作,才能P操作。
P操作:貨物個數減1,減過之后,貨物個數大於等於0,說明已經拿到貨物,線程繼續。否者線程阻塞。
V操作:貨物個數加1,加過之后,貨物個數小於等於容量,說明添加成功,線程繼續。否者線程阻塞。
信號量:0≤ 信號量≤容量 ,取值 表示當前可以使用的貨物;
        信號量<0 ,  取值 表示當前等待使用貨物的線程;
            信號量>容量 ,  信號量-容量 表示當前等待添加貨物的線程。
通常,信號量的容量設置很大,可以一直V操作,不會阻塞,但是P操作的時候,很可能阻塞。
當容量為1,也就是互斥,執行流程必定是V操作,P操作,V操作,P操作...
 
P原語操作的動作是:
(1) sem減1;
(2) 若sem減1后仍大於或等於零,則進程繼續執行;
(3) 若sem減1后小於零,則該進程被阻塞后進入與該信號相對應的隊列中,然后轉進程調度。
V原語操作的動作是:
(1) sem加1;
(2) 若相加結果大於零,則進程繼續執行;
(3) 若相加結果小於或等於零,則從該信號的等待隊列中喚醒一等待進程,然后再返回原進程繼續執行或轉進程調度。
 
 
from multiprocessing import Process,Semaphore
import time
import random
def grid(i,sem):
    sem.acquire()
    print(str(i)+'放入了格子')
    time.sleep(random.randint(2,6))
    print(str(i)+'拿出了格子')
    sem.release()
if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(20):
        Process(target=grid,args=(i,sem)).start()
 
 
3.事件
通過一個信號來控制多個進程同時執行或阻塞
一個事件被創建后,默認是阻塞狀態
 
from multiprocessing import Event
if __name__ == "__main__":
    e = Event()  # c創建一個事件
    print(e.is_set())   # 查看一個事件的狀態
    e.set() #將事件的狀態改為True
    e.wait()    #根據e.is_set()的值決定是否阻塞
    print(1235455)
    e.clear()   #將事件的狀態改為False
    e.wait()
    print(12323545555)
 
紅綠燈問題
from multiprocessing import Event,Process
import time
import random
def traffic_light(e):
    while 1:
        if e.is_set():
            e.clear()
            print('\033[31m[-----------紅燈-----------]\033[0m')
        else:
            e.set()
            print('\033[32m[-----------綠燈-----------]\033[0m')
        time.sleep(2)
 
def car(e,i):
    if not e.is_set():
        print('%s號車在等紅燈' %i)
        e.wait()    #阻塞直到狀態改變
    print('\033[0;32;40m%s號車通過\033[0m' %i)
 
if __name__ == '__main__':
    e = Event()
    light = Process(target=traffic_light,args=(e,))
    light.start()
    for i in range(20):
        time.sleep(random.random())
        cars = Process(target=car,args=(e,i))
        cars.start()

 

 

三、進程間通信

1.隊列Queue

Queue([maxsize])
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
 
方法
Queue([maxsize])
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
 
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
 
q.get_nowait( )
同q.get(False)方法。
 
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。
 
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
 
 
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
 
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。

 

簡單使用:
from multiprocessing import Process,Queue
if __name__ == '__main__':
    q = Queue(5)    #創建隊列
    for i in range(5):
        q.put(i)    #放進數據
    print(q.full())
    #q.put(6)   此處阻塞
    for i in range(5):
        print(q.get())  #獲取數據
    print(q.empty())
    #q.get() 此處阻塞

 

簡單的進程間通信:
from multiprocessing import Event,Process,Queue
def produce(q):
    q.put('from produce')
def comsume(q):
    print(q.get())
if __name__ == '__main__':
    q = Queue(5)    #創建隊列
    pro = Process(target=produce,args=(q,))
    pro.start()
    com = Process(target=comsume, args=(q,))
    com.start()
 
生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
 
1)使用Queue
from multiprocessing import Process,Queue
import time
import random
def producer(name,goods,q):
    for i in range(10):
        time.sleep(random.randint(1,4))
        print('%s生產了第%s個%s'%(name,i,goods))
        q.put('第%s個%s'%(i,goods))
def comsumer(q,name):
    while 1:
        goods = q.get()
        if goods == None:break
        print('\033[31m%s買了了%s\033[0m' % (name,goods))
        time.sleep(random.randint(2,6))
if __name__ == '__main__':
    q = Queue(10)
    p = Process(target=producer,args=('HSR','牛奶',q))
    p2 = Process(target=producer, args=('TTT', '面包', q))
    c = Process(target=comsumer, args=(q,'Lisi'))
    c2 = Process(target=comsumer, args=(q, 'ZhangSan'))
    p.start()
    p2.start()
    c.start()
    c2.start()
    p.join()
    p2.join()
    q.put(None)
    q.put(None)
 
 
2)使用JoinableQueue
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
from multiprocessing import Process,JoinableQueue
import time
import random
def producer(name,goods,q):
    for i in range(10):
        time.sleep(random.randint(1,4))
        print('%s生產了第%s個%s'%(name,i,goods))
        q.put('第%s個%s'%(i,goods))
    q.join()    #阻塞,直到隊列中的數據被全部執行完畢
def comsumer(q,name):
    while 1:
        goods = q.get()
        if goods == None:break
        print('\033[31m%s買了了%s\033[0m' % (name,goods))
        time.sleep(random.randint(2,6))
        q.task_done()   #count - 1
if __name__ == '__main__':
    q = JoinableQueue(10)
    p = Process(target=producer,args=('HSR','牛奶',q))
    p2 = Process(target=producer, args=('TTT', '面包', q))
    c = Process(target=comsumer, args=(q,'Lisi'))
    c2 = Process(target=comsumer, args=(q, 'ZhangSan'))
    p.start()
    p2.start()
    c.daemon = True #設置為守護進程,主進程結束則子進程結束,而這里的主進程等待生產進程的結束
    c2.daemon = True    #生產進程又等待消費進程消費完。所以消費者消費完了就會結束進程
    c.start()
    c2.start()
    p.join()
    p2.join()

 

 

2.管道

multiprocessing.Pipe
(兩端在不同的進程也可以)
 
 
from multiprocessing import Pipe,Process
def func(conn1,conn2):
    conn2.close()
    while 1:
        try:
            print(conn1.recv())
        except EOFError:
            conn1.close()
            break
if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p1 = Process(target=func,args=(conn1, conn2))   #傳給不同進程的conn是不會相互影響的
    p1.start()
    conn1.close()
    for i in range(20):
        conn2.send("hi")
    conn2.close()

 

 
  一頭的conn都關閉后,會EOFError
 
使用管道實現生產者消費者模型
#Pipe有數據不安全性
#管道可能出現一端的多個消費者同時取一個數據
#所以可以加上一個進程鎖來保證安全性
from multiprocessing import Pipe,Process,Lock
import time
import random
def producer(con,pro,name,goods):
    con.close()
    for i in range(8):
        time.sleep(random.randint(1,3))
        print('%s生成了第%s個%s'%(name,i,goods))
        pro.send('第%s個%s'%(i,goods))
    pro.close()
def consumer(con,pro,name,lock):
    pro.close()
    while 1:
        try:
            lock.acquire()
            goods = con.recv()
            lock.release()
            print('%s喝了%s'%(name,goods))
            time.sleep(random.random())
        except EOFError:
            lock.release()  #因為最后消費者通過異常來結束進程,所以最后一次的recv后面的lock.release不會執行,所以要在
                            #這個地方再寫一個release()
            con.close()
            break
if __name__ == '__main__':
    con, pro = Pipe()
    lock = Lock()
    p = Process(target=producer, args=(con,pro,'HSR','牛奶'))
    c = Process(target=consumer, args=(con, pro, 'TTT',lock))
    c2 = Process(target=consumer, args=(con, pro, 'TTT2',lock))
    p.start()
    c.start()
    c2.start()
    con.close()
    pro.close()

 

 

3.Manager

multiprocessing.Manager模塊
 
#報 AttributeError: 'ForkAwareLocal' object has no attribute 'connection' 的原因
#運行這段代碼時,主進程執行完了,斷開了連接,而子進程要連接,此時會報錯
#所以可以用join(),讓主進程等待子進程的結束
from multiprocessing import Manager,Process
def func(dic):
    dic['count'] -= 1
    print(dic)
if __name__ == '__main__':
    m = Manager()    創建一個Manger()
    dic = m.dict({'count':100})    #變成進程共享的字典
    p = Process(target=func, args=(dic,))
    p.start()
    p.join()    #等待子進程結束
 
這里會有進程搶占造成的數據不安全問題,通過加鎖解決
 
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂
        d['count']-=1
 
if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

 

 

四、進程池

1.為什么要有進程池?
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。那么我們要怎么做呢?
在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。
 
2.multiprocess.Pool模塊
Pool([numprocess [,initializer [, initargs]]]):創建進程池
 
參數
numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值
initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
initargs:是要傳給initializer的參數組
 
2.方法
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
'''需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()'''
 
p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''
   
p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
 
P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用
 
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready():如果調用完成,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
 
 
 
3.實例
1)簡單例子1(使用map)
#map如果要給函數傳參數,只能傳可迭代對象
from multiprocessing import Pool
def func(dic):
    print(dic)
def func2(dic):
    print(dic+2)
if __name__ == '__main__':
    pool = Pool(5)  #進程數,CPU核心數+1
                            #如果Pool()不傳參數,默認是cpu核心數
    pool.map(func2,range(100))   #100個任務
    #這里自帶join效果
    pool.map(func, ['hsr','ttt'])  # 2個任務
 
 
2)簡單例子2(使用apply)
from multiprocessing import Pool
import os
import time
def func(n):
    print('[pid:%s]start id:%s'%(os.getpid(),n))
    time.sleep(1.5)
    print('\033[31m[pid:%s]end id:%s\033[0m'%(os.getpid(),n))
 
if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        #pool.apply(func,args=(i,))  #同步
        pool.apply_async(func,args=(i,))    #異步。與主進程完全異步,需要手動close和join
 
    pool.close()    # 結束進程池接收任務
    pool.join() # 感知進程中的任務都執行結束
 
 
3)使用進程池實現多個TCP socket連接
服務端:
import socket
from multiprocessing import Pool
 
def func(conn):
    while 1:
        conn.send(b'hello')
        ret = conn.recv(1024).decode('utf-8')
        if ret == 'q':
            break
        print(ret)
    conn.close()
 
if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 8081))
    sk.listen()
    pool = Pool(5)
    while 1:
        conn, addr = sk.accept()
        pool.apply_async(func,args=(conn,))
 
客戶端:
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8081))
ret = sk.recv(1024).decode('utf-8')
print(ret)
c = input().encode('utf-8')
sk.send(c)
sk.close()
 
 
4.進程池的返回值
from multiprocessing import Pool
def func(i):
    return i**2
if __name__ == '__main__':
    pool = Pool(5)
    #使用map的返回值
    ret = pool.map(func,range(10))
    print(ret)
    res_l = []
    for i in range(10):
        #同步
        # res = pool.apply(func,args=(i,))    #apply的結果就是func的返回值
        # print(res)
        #異步
        res = pool.apply_async(func,args=(i,))    #apply_async的結果
        #這里如果直接使用res.get()來獲取返回值,會阻塞,所以先將其放入列表中,后面再get
        # print(res.get())    #阻塞等待func的結果
        res_l.append(res)
    for i in res_l:
        print(i.get())
 
 
5.回調函數
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
 
from multiprocessing import Pool
def func(i):
    print('in func1')
    return i**2
def func2(n):
    print('in func2')
    print(n)
if __name__ == '__main__':
    pool = Pool(5)
    pool.apply_async(func, args=(10,), callback=func2)  #執行func1,把返回值作為fun2的參數執行func2
                                                        #回調函數func2在主進程中zhi'x
    pool.close()
    pool.join()
 
 
簡單例子:
import requests
from multiprocessing import  Pool
 
def get(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.content.decode('utf-8'),url
 
def call_back(args):
    print(args[1] +"    "+ str(len(args[0])))
 
url_lst = [
    'http://www.cnblog.com',
    'https://www.baidu.com',
    'http://www.sohu.com'
]
 
if __name__ == '__main__':
    pool = Pool(5)
    for i in url_lst:
        pool.apply_async(get,args=(i,),callback=call_back)
    pool.close()
    pool.join()

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM