Python 線程池模塊threadpool 、 concurrent.futures 的 ThreadPoolExecutor


一、threadpool   基本用法

pip install threadpool   

pool = ThreadPool(poolsize)  
requests = makeRequests(some_callable, list_of_args, callback)  
[pool.putRequest(req) for req in requests]  
pool.wait()

第一行定義了一個線程池,表示最多可以創建poolsize這么多線程;

第二行是調用makeRequests創建了要開啟多線程的函數,以及函數相關參數和回調函數,其中回調函數可以不寫,default是無,也就是說makeRequests只需要2個參數就可以運行;

第三行使用列表生成式代替for循環,是將所有要運行多線程的請求扔進線程池,

[pool.putRequest(req) for req in requests]等同於

  for req in requests:  

     pool.putRequest(req) 

第四行是等待所有的線程完成工作后退出。

二、代碼實例

要處理的函數,只需要一個傳參:

import time
import threadpool  
def sayhello(str):
    print "Hello ",str
    time.sleep(2)

name_list =['xiaozi','aa','bb','cc']
start_time = time.time()
pool = threadpool.ThreadPool(10) 
requests = threadpool.makeRequests(sayhello, name_list) 
[pool.putRequest(req) for req in requests] 
pool.wait() 
print '%d second'% (time.time()-start_time)

要處理的函數,只需要N個傳參:

方式一:---參數列表元素需要用元組,([args,...], None)

import time
import threadpool
def sayhello(a, b, c):
    print("Hello ",a, b, c)
    time.sleep(2)

def call_back():
    print('call_back...........')


name_list = [([1,2,3], None), ([4,5,6], None) ]

start_time = time.time()
pool = threadpool.ThreadPool(10)
requests = threadpool.makeRequests(sayhello, name_list)
[pool.putRequest(req) for req in requests]
pool.wait()
print('%d second'% (time.time()-start_time))

Hello 1 2 3
Hello 4 5 6
2 second

Process finished with exit code 0

方式二:---參數列表元素需要用元組,(None, {'key':'value', .......})

import time
import threadpool
def sayhello(a, b, c):
    print("Hello ",a, b, c)
    time.sleep(2)

def call_back():
    print('call_back...........')


# name_list = [([1,2,3], None), ([4,5,6], None) ]
name_list = [(None, {'a':1,'b':2,'c':3}), (None, {'a':4,'b':5, 'c':6}) ]

start_time = time.time()
pool = threadpool.ThreadPool(10)
requests = threadpool.makeRequests(sayhello, name_list)
[pool.putRequest(req) for req in requests]
pool.wait()
print('%d second'% (time.time()-start_time))

concurrent.futures 的ThreadPoolExecutor (線程池) 

https://www.jianshu.com/p/6d6e4f745c27

從Python3.2開始,標准庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。

相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態主線程(或進程)中可以獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值:

  1. 主線程可以獲取某一個線程(或者任務的)的狀態,以及返回值。
  2. 當一個線程完成的時候,主線程能夠立即知道。
  3. 讓多線程和多進程的編碼接口一致。

線程池的基本使用

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/11/21 17:55
#  @Author:zhangmingda
#  @File: ThreadPoolExecutor_study.py
#  @Software: PyCharm
#  Description:

from  concurrent.futures import ThreadPoolExecutor
import time

task_args_list = [('zhangsan', 2),('lishi',3), ('wangwu', 4)]
def task(name, seconds):
    print('% sleep %s seconds start...' % (name, seconds))
    time.sleep(seconds)
    print('% sleep %s seconds done' % (name, seconds))
    return '%s task done' % name

with ThreadPoolExecutor(max_workers=5) as t:
    # [ t.submit(task, *arg) for  arg in task_args_list]
    task1 = t.submit(task, '張三', 1)
    task2 = t.submit(task, '李四', 2)
    task3 = t.submit(task, '王五', 2)
    task4 = t.submit(task, '趙柳', 3)
    print(task1.done())
    print(task2.done())
    print(task3.done())
    print(task4.done())
    time.sleep(2)
    print(task1.done())
    print(task2.done())
    print(task3.done())
    print(task4.done())

    print(task1.result())
    print(task2.result())
    print(task3.result())
    print(task4.result())

 

 

 

  • 使用 with 語句 ,通過 ThreadPoolExecutor 構造實例,同時傳入 max_workers 參數來設置線程池中最多能同時運行的線程數目。

  • 使用 submit 函數來提交線程需要執行的任務到線程池中,並返回該任務的句柄(類似於文件、畫圖),注意 submit() 不是阻塞的,而是立即返回。

  • 通過使用 done() 方法判斷該任務是否結束。上面的例子可以看出,提交任務后立即判斷任務狀態,顯示四個任務都未完成。在延時2.5后,task1 和 task2 執行完畢,task3 仍在執行中。

  • 使用 result() 方法可以獲取任務的返回值 【注意result 是阻塞的會阻塞主線程】

主要方法:

wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)
wait 接受三個參數:
fs: 表示需要執行的序列
timeout: 等待的最大時間,如果超過這個時間即使線程未執行完成也將返回
return_when:表示wait返回結果的條件,默認為 ALL_COMPLETED 全部執行完成再返回;可指定FIRST_COMPLETED 當第一個執行完就退出阻塞
from  concurrent.futures import ThreadPoolExecutor,wait,FIRST_COMPLETED, ALL_COMPLETED
import time

task_args_list = [('zhangsan', 1),('lishi',2), ('wangwu', 3)]
task_list = []


def task(name, seconds):
    print('% sleep %s seconds start...' % (name, seconds))
    time.sleep(seconds)
    print('% sleep %s seconds done' % (name, seconds))
    return '%s task done' % name

with ThreadPoolExecutor(max_workers=5) as t:
    [task_list.append(t.submit(task, *arg)) for  arg in task_args_list]
    wait(task_list, return_when=FIRST_COMPLETED) # 等了一秒
    print('all_task_submit_complete! and First task complete!')
    print(wait(task_list,timeout=1.5)) # 又等了1.5秒,合計等了2.5秒

as_completed 

上面雖提供了判斷任務是否結束的方法,但是不能在主線程中一直判斷。最好的方法是當某個任務結束了,就給主線程返回結果,而不是一直判斷每個任務是否結束。

concurrent.futures 中 的 as_completed() 就是這樣一個方法,當子線程中的任務執行完后,直接用 result() 獲取返回結果
task_args_list = [('zhangsan', 1),('lishi',3), ('wangwu', 2)]
task_list = []


def task(name, seconds):
    print('%s sleep %s seconds start...' % (name, seconds))
    time.sleep(seconds)
    return '%s sleep %s seconds done' % (name, seconds)

with ThreadPoolExecutor(max_workers=5) as t:
    [task_list.append(t.submit(task, *arg)) for  arg in task_args_list]
    [print(future.result()) for future in as_completed(task_list)]

    print('All Task Done!!!!!!!!!')

map

map(fn, *iterables, timeout=None)

fn: 第一個參數 fn 是需要線程執行的函數;
iterables:第二個參數接受一個可迭代對象;
timeout: 第三個參數 timeout 跟 wait() 的 timeout 一樣,但由於 map 是返回線程執行的結果,如果 timeout小於線程執行時間會拋異常 TimeoutError。

用法如下:

def spider(page):
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 3, 1, 4]):
    print("task{}:{}".format(i, result))
    i += 1

#  運行結果
task1:2
task2:3
task3:1
task4:4

使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數 map 的含義相同,都是將序列中的每個元素都執行同一個函數。

上面的代碼對列表中的每個元素都執行 spider() 函數,並分配各線程池。

可以看到執行結果與上面的 as_completed() 方法的結果不同,輸出順序和列表的順序相同,就算 1s 的任務先執行完成,也會先打印前面提交的任務返回的結果。






 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM