Python之路(第四十六篇)多種方法實現python線程池(threadpool模塊\multiprocessing.dummy模塊\concurrent.futures模塊)


一、線程池

很久(python2.6)之前python沒有官方的線程池模塊,只有第三方的threadpool模塊,

之后再python2.6加入了multiprocessing.dummy 作為可以使用線程池的方式,

在python3.2(2012年)之后加入了concurrent.futures模塊(python3.1.5也有,但是python3.1.5發布時間晚於python3.2一年多),這個模塊是python3中自帶的模塊,但是python2.7以上版本也可以安裝使用。

下面分別介紹下各個線程池模塊的使用

使用環境python3.6.4,win7

threadpool模塊(上古時期--python2.6之前)

由於threadpool模塊是第三方模塊需要進行安裝,

安裝

pip install threadpool 或者在pycharm--settings--Project interpreter中安裝

 

使用介紹

(1)引入threadpool模塊

(2)定義線程函數

(3)創建線程 池threadpool.ThreadPool()

(4)創建需要線程池處理的任務即threadpool.makeRequests()

(5)將創建的多個任務put到線程池中,threadpool.putRequest

(6)等到所有任務處理完畢theadpool.pool()

代碼實例

import threadpool, time
​
name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']
​
​
def Say_hello(str):
    print("Hello ", str)
    time.sleep(2)
​
​
def main():
    start_time = time.time()
    #第一步 創建線程池,線程數為4:
    pool = threadpool.ThreadPool(4)
    # 第二步創建線程請求,包涵調用的函數、參數和回調函數:
    # requests = threadpool.makeRequests(func, args_list, call_back)
    
    requests = threadpool.makeRequests(Say_hello, name_list)
    # 第三步將所有要運行多線程的請求扔進線程池:
    [pool.putRequest(req) for req in requests]
    # 第四步等待所有的線程完成工作后退出:
    pool.wait()
    print('用時共: %s second' % (time.time() - start_time))
​
​
​
if __name__ == '__main__':
    main()
​

  

輸出

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Hello  Amuro Namie
Hello   Sarah Brightman
用時共: 4.0012288093566895 second

  

說明:makeRequests存放的是要開啟多線程的函數,以及函數相關參數和回調函數,其中回調函數可以不寫(默認是無),也就是說makeRequests只需要2個參數就可以運行。

 

multiprocessing.dummy(中古時期,python3.2起)

從python3.2起,可以使用multiprocessing.dummy 創建線程池,注意

from  multiprocessing  import  Pool  #這里引入的是進程池
from multiprocessing.dummy import Pool as ThreadPool  #這里引入的才是線程池

  

multiprocessing.dummy模塊與multiprocessing模塊的區別:dummy模塊是多線程,而multiprocessing是多進程,api都是通用的。簡單地說,multiprocessing.dummy是multiprocessing多進程模塊復制的一個多線程模塊,API都是通用的。

這里的多線程也是受到它受到全局解釋器鎖(GIL)的限制,並且一次只有一個線程可以執行附加到CPU的操作。

 

使用介紹

使用有四種方式:apply_async、apply、map_async、map。

 

其中apply_async和map_async是異步的,也就是啟動進程函數之后會繼續執行后續的代碼不用等待進程函數返回。apply_async和map_async方式提供了一寫獲取進程函數狀態的函數:ready()successful()get()。

PS:join()語句要放在close()語句后面。

具體可以參考

Python之路(第四十篇)進程池

 

代碼實例


import  time
from multiprocessing.dummy import Pool as ThreadPool
​
name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']
​
​
def Say_hello(str):
    print("Hello ", str)
    time.sleep(2)
​
​
def main():
    start_time = time.time()
    #第一步 創建線程池,線程數為4:
    pool = ThreadPool(4)
    # 第二步用map方法執行
    # pool.map(func, args)  ,注意這里的map方法自帶pool.close()和pool.join()方法,等待所有子線程執行完
    pool.map(Say_hello,name_list)
    print('用時共: %s second' % (time.time() - start_time))
​
​
if __name__ == '__main__':
    main()

  


輸出結果

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Hello  Amuro Namie
Hello   Sarah Brightman
用時共: 4.004228830337524 second

  

 

concurrent.futures模塊(從python3.2開始自帶)

concurrent.futures模塊,可以利用multiprocessing實現真正的平行計算。

核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序可以利用多核CPU來提升執行速度。由於子進程與主解釋器相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU內核。

 

使用介紹

模塊主要包含下面兩個類:

  1. ThreadPoolExecutor

  2. ProcessPoolExecutor

也就是對 threading 和 multiprocessing 進行了高級別的抽象, 暴露出統一的接口, 方便開發者使用。

可以使用 ThreadPoolExecutor 來進行多線程編程,ProcessPoolExecutor 進行多進程編程,兩者實現了同樣的接口,這些接口由抽象類 Executor 定義。 這個模塊提供了兩大類型,一個是執行器類 Executor,另一個是 Future 類。

ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用

其實 concurrent.futures 底層還是用的 threading 和 multiprocessing 這兩個模塊, 相當於在這上面又封裝了一層, 所以速度上會慢一點, 這個是架構和接口實現上的取舍造成的。

 

基類Executor

#submit(fn, *args, **kwargs)
異步提交任務
​
#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作,map方法接收兩個參數,第一個為要執行的函數,第二個為一個序列,會對序列中的每個元素都執行這個函數,返回值為執行結果組成的生成器
​
#shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前
可以通過 with 語句來避免顯式調用本方法。with 語句會用 wait=True 的默認參數調用 Executor.shutdown() 方法。
​
​
#result(timeout=None)
取得結果
submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future能夠使用done()方法判斷該任務是否結束,done()方法是不阻塞的,使用result()方法可以獲取任務的返回值,這個方法是阻塞的。
​
​
#add_done_callback(fn)
回調函數
​
# done()
判斷某一個線程是否完成
​
# cancle()
取消某個任務

  


 

代碼示例

進程池:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
​
import os,time
def task(n):
    print('%s is runing進程號' %os.getpid())
    time.sleep(2)
    return n**2
​
​
def main():
    start_time = time.time()
    executor=ProcessPoolExecutor(max_workers=3)
​
    futures=[]
    for i in range(10):
        future=executor.submit(task,i)  #這里使用submit提交進程
        futures.append(future)
    executor.shutdown(True)
    print('*'*20)
    for future in futures:
        print(future.result())
    print('用時共: %s second' % (time.time() - start_time))
​
if __name__ == '__main__':
    main()

  

輸出結果

10292 is runing進程號
12516 is runing進程號
9664 is runing進程號
10292 is runing進程號
12516 is runing進程號
9664 is runing進程號
10292 is runing進程號
12516 is runing進程號
9664 is runing進程號
10292 is runing進程號
********************
0
1
4
9
16
25
36
49
64
81
用時共: 8.176467418670654 second

  

也可以使用map方法提交進程

示例

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
​
import os,time
def task(n):
    print('%s is runing進程號' %os.getpid())
    time.sleep(2)
    return n**2
​
​
def main():
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=3) as executor:
        futures = executor.map(task, [i for i in range(10)])
    print('*'*20)
    for future in futures:
        print(future)  #無需再次使用result()方法獲取結果
    print('用時共: %s second' % (time.time() - start_time))
​
if __name__ == '__main__':
    main()

  

輸出結果

1504 is runing進程號
12644 is runing進程號
7732 is runing進程號
1504 is runing進程號
12644 is runing進程號
7732 is runing進程號
1504 is runing進程號
7732 is runing進程號
12644 is runing進程號
1504 is runing進程號
********************
0
1
4
9
16
25
36
49
64
81
用時共: 8.171467304229736 second

  

 

分析:map方法返回的results列表是有序的,順序和*iterables迭代器的順序一致,這里也無需再次使用result()方法獲取結果。

這里使用with操作符,使得當任務執行完成之后,自動執行shutdown函數,而無需編寫相關釋放代碼。

 

map()與submit()使用場景

常用的方法是 submit(), 如果要提交任務的函數是一樣的, 就可以簡化成 map(), 但是如果提交的函數是不一樣的, 或者執行的過程中可能出現異常, 就要使用到 submit(), 因為使用 map() 在執行過程中如果出現異常會直接拋出錯誤, 而 submit() 則會分開處理。

 

 

 

線程池

用法與ProcessPoolExecutor相同

 

示例

 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
​
import os,time
def task(n):
    print('%s is runing進程號' %os.getpid())
    time.sleep(2)
    return n**2
​
​
def main():
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = executor.map(task, [i for i in range(10)])
    print('*'*20)
    for future in futures:
        print(future)
    print('用時共: %s second' % (time.time() - start_time))
​
if __name__ == '__main__':
    main()

  

輸出結果

7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
7976 is runing進程號
********************
0
1
4
9
16
25
36
49
64
81
用時共: 8.001457929611206 second

  

分析:注意所有的進程號都是一樣的,這里是開啟的多線程,所以進程號是一樣的

示例二


import  time
from concurrent.futures import ThreadPoolExecutor
​
name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']
​
​
def say_hello(str):
    print("Hello ", str)
    time.sleep(2)
​
​
def main():
    start_time = time.time()
    # 用map方法執行
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = executor.map(say_hello,name_list)
    print('用時共: %s second' % (time.time() - start_time))
​
​
if __name__ == '__main__':
    main()
​

  

 

輸出結果

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Hello  Amuro Namie
Hello   Sarah Brightman
用時共: 4.0022289752960205 second

  回調函數的使用

import  time
from concurrent.futures import ThreadPoolExecutor

name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
             'Amuro Namie',' Sarah Brightman']


def say_hello(str):
    print("Hello ", str)
    time.sleep(2)
    return str



def call_back(res):
    res = res.result()  #獲取結果
    print(res,"長度是%s"%len(res))



def main():
    start_time = time.time()
    # 用submit方法執行
    executor=ThreadPoolExecutor(max_workers=4)

    for i in name_list:
        executor.submit(say_hello,i).add_done_callback(call_back)
    executor.shutdown(True)
    #這里使用submit提交線程,使用add_done_callback()添加回調函數
    print('用時共: %s second' % (time.time() - start_time))


if __name__ == '__main__':
    main()

  輸出結果

Hello  Satomi Ishihara
Hello  Aragaki Yui
Hello  Nainaiwei Hashimoto
Hello  HIKARU UTADA
HIKARU UTADA 長度是12
Aragaki Yui 長度是11
Hello  Mai Kuraki
Hello  Nozomi Sasaki
Satomi Ishihara 長度是15
Nainaiwei Hashimoto 長度是19
Hello  Amuro Namie
Hello   Sarah Brightman
Nozomi Sasaki 長度是13
Amuro Namie 長度是11
 Sarah Brightman 長度是16
Mai Kuraki 長度是10
用時共: 4.0022289752960205 second

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM