進程池、線程池、異步調用(取返回值)


進程池vs線程池

為什么要用“池”:
池子使用來限制並發的任務數目,限制我們的計算機在一個自己可承受的范圍內去並發地執行任務

池子內什么時候裝進程:並發的任務屬於計算密集型
池子內什么時候裝線程:並發的任務屬於IO密集型
concurrent:並發的,一致的,同時發生的  Executor執行者 

'''
#1、阻塞與非阻塞指的是程序的兩種運行狀態
阻塞:遇到IO就發生阻塞,程序一旦遇到阻塞操作就會停在原地,並且立刻釋放CPU資源
非阻塞(就緒態或運行態):沒有遇到IO操作,或者通過某種手段讓程序即便是遇到IO操作也不會停在原地,執行其他操作,力求盡可能多的占有CPU


#2、同步與異步指的是提交任務的兩種方式:
同步調用:提交完任務后,就在原地等待,直到任務運行完畢后,拿到任務的返回值,才繼續執行下一行代碼
異步調用:提交完任務后,不在原地等待,直接執行下一行代碼,等到任務有返回值后自動觸發后調函數

'''
 
#進程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,os,random

def task(x):
    print('%s 接客' %os.getpid())
    time.sleep(random.randint(2,5))
    return x**2


if __name__ == '__main__':
    # p=ProcessPoolExecutor(3) # 不指定默認開啟的進程數是cpu的核數
    # print(os.cpu_count())查看CPU的核數
    p=ProcessPoolExecutor(max_workers=2) # 默認開啟的進程數是cpu的核數,也就是干活的人數

    #簡寫成for循環模擬6個任務執行
    for i in range(6):
        p.submit(task,i)  #submit只是負責往池子里面丟任務,剛開始2個人同時接客


#1.submit往里面丟任務干活的人就是你指定的ProcessPoolExecutor(2)后面的人數
#2.剛一造個進程池,立馬給你預備2個人干活,無論你丟多少任務,干活的永遠是那2個人(誰先完了騰出手再接下一個)

 

#線程池
def task(x):
    print('%s is running'%x)
    time.sleep(random.randint(2, 5))
    # return x**2

if __name__ == '__main__':
    # p=ProcessPoolExecutor(max_workers=2)
    p=ThreadPoolExecutor(20)   #默認不寫開啟的是CPU核數*5

    for i in range(25):
        p.submit(task,i)

 

 

result功能(可以通過每次線程或者進程對象.result(),拿到返回值)

#線程池(result功能)
def task(x):
    print('%s is running'%x)
    time.sleep(random.randint(2, 5))
    return x**2

if __name__ == '__main__':
    # p=ProcessPoolExecutor(max_workers=2)
    p=ThreadPoolExecutor(3)   #默認不寫開啟的是CPU核數*5

    for i in range(6):
        obj=p.submit(task,i)
        print(obj.result())     #重點:每個線程打印 is running后,obj.result()都回去等着拿return x**2的返回值,所以輸出結果會是串行的輸出

    print('')

'''結果:因為每次
1 is running
1
2 is running
4
3 is running
9
4 is running
16
5 is running
25
主


'''

 

 線程池、進程池下異步調用分析:

版本1:

#線程池下分析異步調用(取返回值),分析版本1
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,os,random
#線程池為例:
def task(x):
    print('%s is running'%x)
    time.sleep(random.randint(2, 5))
    return x**2

if __name__ == '__main__':
    異步調用:submit只負責提交任務,不會等待拿結果不會拿return
    p=ThreadPoolExecutor(4)
    obj_l=[]
    for i in range(10):
        obj=p.submit(task,i)   #每個返回obj對象里面,其實都封裝了一個result方法
        # print(obj.result()) #可以通過result拿到每次return的返回值
        obj_l.append(obj)
        # print(obj_l)
    # p.close()
    # p.join() #
    p.shutdown(wait=True)  #關閉進程池入口 等着取一個數目就減1

    #實現異步調用拿結果:如何取結果:
    print(obj_l[0].result())  #第一個任務的結果 即return 0**2
    print(obj_l[1].result())  #第二個:1**2
    print('')

    同步調用:每次都拿到返回結果
    p=ThreadPoolExecutor(4)
    for i in range(10):
        obj=p.submit(task,i)
        print(obj.result())   #每次等待返回值后再執行下一個
    print('')

版本2:

#線程池下分析異步調用(取返回值),分析版本2
def task(x):
    print('%s is running' % x)
    time.sleep(10)
    return x ** 2

def parse(res):
    print('.......')

if __name__ == '__main__':
    # 異步調用:
    p=ThreadPoolExecutor(4)
    obj_l=[]
    for i in range(10):
        obj=p.submit(task,i)
        obj_l.append(obj)

    #實現目的:為了保證進程池里面任務全部運行完,后再統一拿返回值
    p.shutdown(wait=True)  #1.關閉進程池入口(確保統計時進程池不會再有數據進入),等着取一個數目就減1
    # print(obj_l)  #第一個任務的結果 即return 0**2
    for future in obj_l:
        # print(future.result())
        parse(future.result())  #4個任務一次10s + 4個任務(每個1s)=共14s
    print('')

 

最終版本(引用回調函數)

#線程池、進程池異步調用取返回值最終版(引用回調函數add_done_callback())
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, os, random
import os

def task(x):
    print('%s is running' %x)
    # time.sleep(1)
    return x ** 2
    # parse(x**2)  #產生結果后立馬被處理

def parse(future):
    # time.sleep(1)
    res=future.result()
    print('%s 處理了 %s'%(os.getpid(),res))  #統一是主進程的一個Pid
if __name__ == '__main__':
    poo1=ProcessPoolExecutor(4)  #進程為9s左右
    # poo1=ThreadPoolExecutor(4)   #線程為6s左右,線程效率更高
    start=time.time()
    for i in range(1,5):
        future=poo1.submit(task,i)
        #回調函數就是有結果之后立馬執行結果,回調是主進程去處理運行結果,所以parse函數os.getpid()pid都是主進程一個PID
        #提交完之后給對象綁定了回調函數,parse會在future有返回值是立即觸發,並且將future當做參數傳給parse
        future.add_done_callback(parse) #綁定回調函數
    poo1.shutdown(wait=True) #關閉進程池入口,確保進程池里面不會再有數據進入
    stop=time.time()

    print('',(stop-start))

'''
總結:根據以上執行
    回調函數:
        1.在線程池環境下,處理回調函數是線程池里面的運行的,是線程池里面的線程去執行的
        (還是I/O密集型)
        
        2.在進程池環境下,處理回調函數是主進程去處理的
        
        
'''

 

 

 

 

線程池與進程池里面沒有join()這個方法,


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM