進程池vs線程池
為什么要用“池”:
池子使用來限制並發的任務數目,限制我們的計算機在一個自己可承受的范圍內去並發地執行任務
池子內什么時候裝進程:並發的任務屬於計算密集型
池子內什么時候裝線程:並發的任務屬於IO密集型
(
concurrent:並發的,一致的,同時發生的 Executor執行者
)
'''
#1、阻塞與非阻塞指的是程序的兩種運行狀態
阻塞:遇到IO就發生阻塞,程序一旦遇到阻塞操作就會停在原地,並且立刻釋放CPU資源
非阻塞(就緒態或運行態):沒有遇到IO操作,或者通過某種手段讓程序即便是遇到IO操作也不會停在原地,執行其他操作,力求盡可能多的占有CPU
#2、同步與異步指的是提交任務的兩種方式:
同步調用:提交完任務后,就在原地等待,直到任務運行完畢后,拿到任務的返回值,才繼續執行下一行代碼
異步調用:提交完任務后,不在原地等待,直接執行下一行代碼,等到任務有返回值后自動觸發后調函數
'''
#進程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,os,random def task(x): print('%s 接客' %os.getpid()) time.sleep(random.randint(2,5)) return x**2 if __name__ == '__main__': # p=ProcessPoolExecutor(3) # 不指定默認開啟的進程數是cpu的核數 # print(os.cpu_count())查看CPU的核數 p=ProcessPoolExecutor(max_workers=2) # 默認開啟的進程數是cpu的核數,也就是干活的人數 #簡寫成for循環模擬6個任務執行 for i in range(6): p.submit(task,i) #submit只是負責往池子里面丟任務,剛開始2個人同時接客 #1.submit往里面丟任務干活的人就是你指定的ProcessPoolExecutor(2)后面的人數 #2.剛一造個進程池,立馬給你預備2個人干活,無論你丟多少任務,干活的永遠是那2個人(誰先完了騰出手再接下一個)
#線程池 def task(x): print('%s is running'%x) time.sleep(random.randint(2, 5)) # return x**2 if __name__ == '__main__': # p=ProcessPoolExecutor(max_workers=2) p=ThreadPoolExecutor(20) #默認不寫開啟的是CPU核數*5 for i in range(25): p.submit(task,i)
result功能(可以通過每次線程或者進程對象.result(),拿到返回值)
#線程池(result功能) def task(x): print('%s is running'%x) time.sleep(random.randint(2, 5)) return x**2 if __name__ == '__main__': # p=ProcessPoolExecutor(max_workers=2) p=ThreadPoolExecutor(3) #默認不寫開啟的是CPU核數*5 for i in range(6): obj=p.submit(task,i) print(obj.result()) #重點:每個線程打印 is running后,obj.result()都回去等着拿return x**2的返回值,所以輸出結果會是串行的輸出 print('主') '''結果:因為每次 1 is running 1 2 is running 4 3 is running 9 4 is running 16 5 is running 25 主 '''
線程池、進程池下異步調用分析:
版本1:
#線程池下分析異步調用(取返回值),分析版本1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,os,random #線程池為例: def task(x): print('%s is running'%x) time.sleep(random.randint(2, 5)) return x**2 if __name__ == '__main__': 異步調用:submit只負責提交任務,不會等待拿結果不會拿return p=ThreadPoolExecutor(4) obj_l=[] for i in range(10): obj=p.submit(task,i) #每個返回obj對象里面,其實都封裝了一個result方法 # print(obj.result()) #可以通過result拿到每次return的返回值 obj_l.append(obj) # print(obj_l) # p.close() # p.join() # p.shutdown(wait=True) #關閉進程池入口 等着取一個數目就減1 #實現異步調用拿結果:如何取結果: print(obj_l[0].result()) #第一個任務的結果 即return 0**2 print(obj_l[1].result()) #第二個:1**2 print('主') 同步調用:每次都拿到返回結果 p=ThreadPoolExecutor(4) for i in range(10): obj=p.submit(task,i) print(obj.result()) #每次等待返回值后再執行下一個 print('主')
版本2:
#線程池下分析異步調用(取返回值),分析版本2 def task(x): print('%s is running' % x) time.sleep(10) return x ** 2 def parse(res): print('.......') if __name__ == '__main__': # 異步調用: p=ThreadPoolExecutor(4) obj_l=[] for i in range(10): obj=p.submit(task,i) obj_l.append(obj) #實現目的:為了保證進程池里面任務全部運行完,后再統一拿返回值 p.shutdown(wait=True) #1.關閉進程池入口(確保統計時進程池不會再有數據進入),等着取一個數目就減1 # print(obj_l) #第一個任務的結果 即return 0**2 for future in obj_l: # print(future.result()) parse(future.result()) #4個任務一次10s + 4個任務(每個1s)=共14s print('主')
最終版本(引用回調函數)
#線程池、進程池異步調用取返回值最終版(引用回調函數add_done_callback()) from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time, os, random import os def task(x): print('%s is running' %x) # time.sleep(1) return x ** 2 # parse(x**2) #產生結果后立馬被處理 def parse(future): # time.sleep(1) res=future.result() print('%s 處理了 %s'%(os.getpid(),res)) #統一是主進程的一個Pid if __name__ == '__main__': poo1=ProcessPoolExecutor(4) #進程為9s左右 # poo1=ThreadPoolExecutor(4) #線程為6s左右,線程效率更高 start=time.time() for i in range(1,5): future=poo1.submit(task,i) #回調函數就是有結果之后立馬執行結果,回調是主進程去處理運行結果,所以parse函數os.getpid()pid都是主進程一個PID #提交完之后給對象綁定了回調函數,parse會在future有返回值是立即觸發,並且將future當做參數傳給parse future.add_done_callback(parse) #綁定回調函數 poo1.shutdown(wait=True) #關閉進程池入口,確保進程池里面不會再有數據進入 stop=time.time() print('主',(stop-start)) ''' 總結:根據以上執行 回調函數: 1.在線程池環境下,處理回調函數是線程池里面的運行的,是線程池里面的線程去執行的 (還是I/O密集型) 2.在進程池環境下,處理回調函數是主進程去處理的 '''
線程池與進程池里面沒有join()這個方法,
