很久(python2.6)之前python沒有官方的線程池模塊,只有第三方的threadpool模塊,
之后再python2.6加入了multiprocessing.dummy
作為可以使用線程池的方式,
在python3.2(2012年)之后加入了concurrent.futures模塊(python3.1.5也有,但是python3.1.5發布時間晚於python3.2一年多),這個模塊是python3中自帶的模塊,但是python2.7以上版本也可以安裝使用。
下面分別介紹下各個線程池模塊的使用
使用環境python3.6.4,win7
threadpool模塊(上古時期--python2.6之前)
由於threadpool模塊是第三方模塊需要進行安裝,
安裝
pip install threadpool 或者在pycharm--settings--Project interpreter中安裝
使用介紹
(1)引入threadpool模塊
(2)定義線程函數
(3)創建線程 池threadpool.ThreadPool()
(4)創建需要線程池處理的任務即threadpool.makeRequests()
(5)將創建的多個任務put到線程池中,threadpool.putRequest
(6)等到所有任務處理完畢theadpool.pool()
代碼實例
import threadpool, time name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki', 'Amuro Namie',' Sarah Brightman'] def Say_hello(str): print("Hello ", str) time.sleep(2) def main(): start_time = time.time() #第一步 創建線程池,線程數為4: pool = threadpool.ThreadPool(4) # 第二步創建線程請求,包涵調用的函數、參數和回調函數: # requests = threadpool.makeRequests(func, args_list, call_back) requests = threadpool.makeRequests(Say_hello, name_list) # 第三步將所有要運行多線程的請求扔進線程池: [pool.putRequest(req) for req in requests] # 第四步等待所有的線程完成工作后退出: pool.wait() print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出
Hello Satomi Ishihara Hello Aragaki Yui Hello Nainaiwei Hashimoto Hello HIKARU UTADA Hello Mai Kuraki Hello Nozomi Sasaki Hello Amuro Namie Hello Sarah Brightman 用時共: 4.0012288093566895 second
說明:makeRequests存放的是要開啟多線程的函數,以及函數相關參數和回調函數,其中回調函數可以不寫(默認是無),也就是說makeRequests只需要2個參數就可以運行。
multiprocessing.dummy(中古時期,python3.2起)
從python3.2起,可以使用multiprocessing.dummy 創建線程池,注意
from multiprocessing import Pool #這里引入的是進程池 from multiprocessing.dummy import Pool as ThreadPool #這里引入的才是線程池
multiprocessing.dummy
模塊與multiprocessing
模塊的區別:dummy
模塊是多線程,而multiprocessing
是多進程,api都是通用的。簡單地說,multiprocessing.dummy是multiprocessing多進程模塊復制的一個多線程模塊,API都是通用的。
這里的多線程也是受到它受到全局解釋器鎖(GIL)的限制,並且一次只有一個線程可以執行附加到CPU的操作。
使用介紹
使用有四種方式:apply_async、apply、map_async、map。
其中apply_async和map_async是異步的,也就是啟動進程函數之后會繼續執行后續的代碼不用等待進程函數返回。apply_async和map_async方式提供了一寫獲取進程函數狀態的函數:ready()、successful()、get()。
PS:join()語句要放在close()語句后面。
具體可以參考
代碼實例
import time from multiprocessing.dummy import Pool as ThreadPool name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki', 'Amuro Namie',' Sarah Brightman'] def Say_hello(str): print("Hello ", str) time.sleep(2) def main(): start_time = time.time() #第一步 創建線程池,線程數為4: pool = ThreadPool(4) # 第二步用map方法執行 # pool.map(func, args) ,注意這里的map方法自帶pool.close()和pool.join()方法,等待所有子線程執行完 pool.map(Say_hello,name_list) print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出結果
Hello Satomi Ishihara Hello Aragaki Yui Hello Nainaiwei Hashimoto Hello HIKARU UTADA Hello Mai Kuraki Hello Nozomi Sasaki Hello Amuro Namie Hello Sarah Brightman 用時共: 4.004228830337524 second
concurrent.futures模塊(從python3.2開始自帶)
concurrent.futures模塊,可以利用multiprocessing實現真正的平行計算。
核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序可以利用多核CPU來提升執行速度。由於子進程與主解釋器相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU內核。
使用介紹
模塊主要包含下面兩個類:
-
ThreadPoolExecutor
-
ProcessPoolExecutor
也就是對 threading 和 multiprocessing 進行了高級別的抽象, 暴露出統一的接口, 方便開發者使用。
可以使用 ThreadPoolExecutor
來進行多線程編程,ProcessPoolExecutor
進行多進程編程,兩者實現了同樣的接口,這些接口由抽象類 Executor
定義。 這個模塊提供了兩大類型,一個是執行器類 Executor
,另一個是 Future
類。
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
其實 concurrent.futures 底層還是用的 threading 和 multiprocessing 這兩個模塊, 相當於在這上面又封裝了一層, 所以速度上會慢一點, 這個是架構和接口實現上的取舍造成的。
基類Executor
#submit(fn, *args, **kwargs) 異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操作,map方法接收兩個參數,第一個為要執行的函數,第二個為一個序列,會對序列中的每個元素都執行這個函數,返回值為執行結果組成的生成器 #shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源后才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前 可以通過 with 語句來避免顯式調用本方法。with 語句會用 wait=True 的默認參數調用 Executor.shutdown() 方法。 #result(timeout=None) 取得結果 submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future能夠使用done()方法判斷該任務是否結束,done()方法是不阻塞的,使用result()方法可以獲取任務的返回值,這個方法是阻塞的。 #add_done_callback(fn) 回調函數 # done() 判斷某一個線程是否完成 # cancle() 取消某個任務
代碼示例
進程池:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time def task(n): print('%s is runing進程號' %os.getpid()) time.sleep(2) return n**2 def main(): start_time = time.time() executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(10): future=executor.submit(task,i) #這里使用submit提交進程 futures.append(future) executor.shutdown(True) print('*'*20) for future in futures: print(future.result()) print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出結果
10292 is runing進程號 12516 is runing進程號 9664 is runing進程號 10292 is runing進程號 12516 is runing進程號 9664 is runing進程號 10292 is runing進程號 12516 is runing進程號 9664 is runing進程號 10292 is runing進程號 ******************** 0 1 4 9 16 25 36 49 64 81 用時共: 8.176467418670654 second
也可以使用map方法提交進程
示例
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time def task(n): print('%s is runing進程號' %os.getpid()) time.sleep(2) return n**2 def main(): start_time = time.time() with ProcessPoolExecutor(max_workers=3) as executor: futures = executor.map(task, [i for i in range(10)]) print('*'*20) for future in futures: print(future) #無需再次使用result()方法獲取結果 print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出結果
1504 is runing進程號 12644 is runing進程號 7732 is runing進程號 1504 is runing進程號 12644 is runing進程號 7732 is runing進程號 1504 is runing進程號 7732 is runing進程號 12644 is runing進程號 1504 is runing進程號 ******************** 0 1 4 9 16 25 36 49 64 81 用時共: 8.171467304229736 second
分析:map方法返回的results列表是有序的,順序和*iterables迭代器的順序一致,這里也無需再次使用result()方法獲取結果。
這里使用with操作符,使得當任務執行完成之后,自動執行shutdown函數,而無需編寫相關釋放代碼。
map()與submit()使用場景
常用的方法是 submit()
, 如果要提交任務的函數是一樣的, 就可以簡化成 map(), 但是如果提交的函數是不一樣的, 或者執行的過程中可能出現異常, 就要使用到 submit(), 因為使用 map() 在執行過程中如果出現異常會直接拋出錯誤, 而 submit() 則會分開處理。
線程池
用法與ProcessPoolExecutor相同
示例
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time def task(n): print('%s is runing進程號' %os.getpid()) time.sleep(2) return n**2 def main(): start_time = time.time() with ThreadPoolExecutor(max_workers=3) as executor: futures = executor.map(task, [i for i in range(10)]) print('*'*20) for future in futures: print(future) print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出結果
7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 7976 is runing進程號 ******************** 0 1 4 9 16 25 36 49 64 81 用時共: 8.001457929611206 second
分析:注意所有的進程號都是一樣的,這里是開啟的多線程,所以進程號是一樣的
示例二
import time from concurrent.futures import ThreadPoolExecutor name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki', 'Amuro Namie',' Sarah Brightman'] def say_hello(str): print("Hello ", str) time.sleep(2) def main(): start_time = time.time() # 用map方法執行 with ThreadPoolExecutor(max_workers=4) as executor: futures = executor.map(say_hello,name_list) print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出結果
Hello Satomi Ishihara Hello Aragaki Yui Hello Nainaiwei Hashimoto Hello HIKARU UTADA Hello Mai Kuraki Hello Nozomi Sasaki Hello Amuro Namie Hello Sarah Brightman 用時共: 4.0022289752960205 second
回調函數的使用
import time from concurrent.futures import ThreadPoolExecutor name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki', 'Amuro Namie',' Sarah Brightman'] def say_hello(str): print("Hello ", str) time.sleep(2) return str def call_back(res): res = res.result() #獲取結果 print(res,"長度是%s"%len(res)) def main(): start_time = time.time() # 用submit方法執行 executor=ThreadPoolExecutor(max_workers=4) for i in name_list: executor.submit(say_hello,i).add_done_callback(call_back) executor.shutdown(True) #這里使用submit提交線程,使用add_done_callback()添加回調函數 print('用時共: %s second' % (time.time() - start_time)) if __name__ == '__main__': main()
輸出結果
Hello Satomi Ishihara Hello Aragaki Yui Hello Nainaiwei Hashimoto Hello HIKARU UTADA HIKARU UTADA 長度是12 Aragaki Yui 長度是11 Hello Mai Kuraki Hello Nozomi Sasaki Satomi Ishihara 長度是15 Nainaiwei Hashimoto 長度是19 Hello Amuro Namie Hello Sarah Brightman Nozomi Sasaki 長度是13 Amuro Namie 長度是11 Sarah Brightman 長度是16 Mai Kuraki 長度是10 用時共: 4.0022289752960205 second