Python3:concurrent.futures實現高並發。


 

From:https://www.cnblogs.com/weihengblog/p/9812110.html

concurrent.futures 官方文檔:https://docs.python.org/3/library/concurrent.futures.html

concurrent.futures: 線程池, 讓你更加高效, 並發的處理任務https://www.h3399.cn/201906/703751.html

 

 

python 因為其全局解釋器鎖 GIL 而無法通過線程實現真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,

IO 密集型 vs 計算密集型:

  1.  IO密集型:讀取文件,讀取網絡套接字頻繁。
  2.  計算密集型:大量消耗CPU的數學與邏輯運算,也就是我們這里說的平行計算。

而 concurrent.futures 模塊,可以利用 multiprocessing 實現真正的平行計算。

核心原理是:concurrent.futures 會以子進程的形式,平行的運行多個 python 解釋器,從而令 python 程序可以利用多核 CPU 來提升執行速度。由於 子進程 與 主解釋器 相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU 內核。

 

 

Python 模塊 - Concurrent.futures

 

從 Python3.2開始,Python 標准庫提供了 concurrent.futures 模塊,為開發人員提供了啟動異步任務的高級接口。 它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對 threading 和 multiprocessing 的更高級的抽象,對編寫 線程池/進程池 提供了直接的支持。 可以將相應的 tasks 直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題線程池/進程池會自動幫我們調度。

Future總結

1. python3自帶,python2需要安裝
 
2. Executer對象
 
它是一個抽象類,它提供了異步執行的方法,他不能直接使用,但可以通過它的子類
 
ThreadPoolExecuter和ProcessPoolExecuter
 
2.1 Executer.submit(fn, *args, **kwargs)
 
fn: 需要異步執行的函數
 
*args,**kwargs fn 接受的參數
 
該方法的作用就是提交一個可執行的回調 task,它返回一個 Future 對象
 
2.2 map(fn, *iterables, timeout=None, chunksize=1)
 
map(task,URLS) # 返回一個 map()迭代器,這個迭代器中的回調執行返回的結果是有序的
 
3. Future對象相關
 
future可以理解為一個在未來完成的操作,這是異步編程的基礎
 
通常情況下我們在遇到IO操作的時候,將會發生阻塞,cpu不能做其他事情
 
而future的引入幫助我們在這段等待時間可以完成其他的操作
 
3.1 done():
 
如果當前線程已取消/已成功,返回True。
 
3.2 cance():
 
如果當前線程正在執行,並且不能取消調用,返回Flase。否則調用取消,返回True
 
3.3 running():
 
如果當前的線程正在執行,則返回True
 
3.4 result():
 
返回調用返回的值,如果調用尚未完成,則此方法等待
 
如果等待超時,會拋出concurrent.futures.TimeoutError
 
如果沒有指定超時時間,則等待無時間限制
 
如果在完成之前,取消了Future,則會引發CancelledError
 
4. as_completed():
 
在多個Future實例上的迭代器將會被返回
 
這些Future實例由fs完成時產生。
 
由fs返回的任何重復的Future,都會被返回一次。
 
里面保存的都是已經執行完成的Future對象
 
5. wait():
 
返回一個元祖,元祖包含兩個元素
 
1. 已完成的future集合
 
2. 未完成的future集合

初體驗:

  1. # coding=utf-8
     
    from concurrent import futures
     
    from concurrent.futures import Future
     
    import time
     
     
     
    def return_future(msg):
     
    time.sleep(3)
     
    return msg
     
     
     
     
     
    pool = futures.ThreadPoolExecutor(max_workers=2)
     
     
     
    t1 = pool.submit(return_future,'hello')
     
    t2 = pool.submit(return_future,'world')
     
     
     
    time.sleep(3)
     
    print(t1.done()) # 如果順利完成,則返回True
     
    time.sleep(3)
     
    print(t2.done())
     
     
     
    print(t1.result()) # 獲取future的返回值
     
    time.sleep(3)
     
    print(t2.result())
     
     
     
    print("主線程")

     

     
     

map(func,* iterables,timeout = None,chunksize = 1 )

# coding=utf-8
 
 
 
import time
 
from concurrent.futures import Future,as_completed
 
from concurrent.futures import ThreadPoolExecutor as Pool
 
import requests
 
import time
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
def task(url,timeout=10):
 
return requests.get(url=url,timeout=timeout)
 
 
 
 
 
pool = Pool()
 
result = pool.map(task,URLS)
 
 
 
start_time = time.time()
 
 
 
# 按照 URLS 的順序返回
 
for res in result:
 
print("{} {}".format(res.url,len(res.content)))
 
 
 
# 無序的
 
with Pool(max_workers=3) as executer:
 
future_task = [executer.submit(task,url) for url in URLS]
 
 
 
for f in as_completed(future_task):
 
if f.done():
 
f_ret = f.result() # f.result()得到task的返回值,requests對象
 
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
 
 
 
print("耗時", time.time() - start_time)
 
print("主線程")

 

 

Future對象

Future可以理解為一個未來完成的操作
當我們執行io操作的時候,在等待返回結果之前會產生阻塞
cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他操作

from concurrent.futures import ThreadPoolExecutor as Pool
 
from concurrent.futures import as_completed
 
import requests
 
import time
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
def task(url,timeout=10):
 
return requests.get(url=url,timeout=timeout)
 
 
 
# start_time = time.time()
 
# for url in URLS:
 
# ret = task(url)
 
# print("{} {}".format(ret.url,len(ret.content)))
 
# print("耗時",time.time() - start_time)
 
with Pool(max_workers=3) as executor:
 
# 創建future任務
 
future_task = [executor.submit(task,url) for url in URLS]
 
 
 
for f in future_task:
 
if f.running():
 
print("%s is running"%str(f))
 
 
 
for f in as_completed(future_task):
 
try:
 
ret = f.done()
 
if ret:
 
f_ret = f.result()
 
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
 
except Exception as e:
 
f.cance()
 
print(e)
 
 
 
"""
 
url不是按照順序返回的,說明並發時,當訪問某一個url時,如果沒有得到返回結果,不會發生阻塞
 
<Future at 0x1c63990e6d8 state=running> is running
 
<Future at 0x1c639922780 state=running> is running
 
<Future at 0x1c639922d30 state=running> is running
 
<Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
 
<Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101
 
<Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103
 
"""

 

 

模塊方法

 

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait() 會返回一個tuple,tuple 會包含兩個集合:已完成的集合 和 未完成的集合。使用 wait() 會獲得更大的自由度,他接受三個參數:FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETE默認為 ALL_COMPLETE

如果采用默認的 ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:

 

from concurrent.futures import Future
 
from concurrent.futures import ThreadPoolExecutor as Pool
 
from concurrent.futures import as_completed, wait
 
import requests
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
 
 
def task(url, timeout=10):
 
r = requests.get(url=url, timeout=timeout)
 
print(r.status_code)
 
 
 
 
 
with Pool(max_workers=3) as execute:
 
future_task = [execute.submit(task, url) for url in URLS]
 
 
 
for f in future_task:
 
if f.running():
 
print("%s" % (str(f)))
 
 
 
"""
 
並且wait還有timeout和return_when兩個參數
 
return_when有三個常量 (默認是 ALL_COMPLETED)
 
FIRST_COMPLETED 任何一個future_task執行完成時/取消時,改函數返回
 
FIRST_EXCEPTION 任何一個future_task發生異常時,該函數返回,如果沒有異常發生,等同於ALL_COMPLETED
 
ALL_COMPLETED 當所有的future_task執行完畢返回。
 
"""
 
results = wait(future_task, return_when="FIRST_COMPLETED") #
 
done = results[0]
 
for d in done:
 
print(d)

 

 

 

 

concurrent.futures.as_completed(fs, timeout=None)

在多個 Future 實例上的迭代器將會被返回,這些 Future 實例由 fs 完成時產生。由 fs 返回的任何重復的 Future,都會被返回一次。里面保存的都是已經執行完成的 Future 對象。

from concurrent.futures import ThreadPoolExecutor as Pool
 
from concurrent.futures import as_completed
 
import requests
 
import time
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
def task(url,timeout=10):
 
return requests.get(url=url,timeout=timeout)
 
 
 
with Pool(max_workers=3) as executor:
 
# 創建future任務
 
future_task = [executor.submit(task,url) for url in URLS]
 
 
 
for f in future_task:
 
if f.running():
 
print("%s is running"%str(f))
 
 
 
for f in as_completed(future_task):
 
try:
 
ret = f.done()
 
if ret:
 
f_ret = f.result()
 
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
 
except Exception as e:
 
f.cance()
 
print(e)

 

下面我們將學習 concurrent.futures 模塊中的類。concurrent.futures 基礎模塊是 executor 和 future。

使用示例代碼:

# -*- coding:utf-8 -*-
 
 
 
import redis
 
from redis import WatchError
 
from concurrent.futures import ProcessPoolExecutor
 
 
 
r = redis.Redis(host='127.0.0.1', port=6379)
 
 
 
 
 
# 減庫存函數, 循環直到減庫存完成
 
# 庫存充足, 減庫存成功, 返回True
 
# 庫存不足, 減庫存失敗, 返回False
 
 
 
def reduce_stock():
 
 
 
# python中redis事務是通過pipeline的封裝實現的
 
with r.pipeline() as pipe:
 
while True:
 
try:
 
# watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常
 
pipe.watch('stock:count')
 
count = int(pipe.get('stock:count'))
 
if count > 0: # 有庫存
 
# 事務開始
 
pipe.multi()
 
pipe.decr('stock:count')
 
# 把命令推送過去
 
# execute返回命令執行結果列表, 這里只有一個decr返回當前值
 
print(pipe.execute()[0])
 
return True
 
else:
 
return False
 
except WatchError as ex:
 
# 打印WatchError異常, 觀察被watch鎖住的情況
 
print(ex)
 
pipe.unwatch()
 
 
 
 
 
def worker():
 
while True:
 
# 沒有庫存就退出
 
if not reduce_stock():
 
break
 
 
 
 
 
if __name__ == "__main__":
 
# 設置庫存為100
 
r.set("stock:count", 100)
 
 
 
# 多進程模擬多個客戶端提交
 
with ProcessPoolExecutor() as pool:
 
for _ in range(10):
 
pool.submit(worker)

 

 

 

concurrent.futures 模塊詳解

 

1. Executor對象

class concurrent.futures.Executor

Executor是一個抽象類,它提供了異步執行調用的方法。它不能直接使用,但可以通過它的兩個子類ThreadPoolExecutor或者ProcessPoolExecutor進行調用。

 

1.1 Executor.submit(fn, *args, **kwargs)

fn:需要異步執行的函數
*args, **kwargs:fn 的參數

示例代碼:

# -*- coding:utf-8 -*-
 
from concurrent import futures
 
 
 
 
 
def test(num):
 
import time
 
return time.ctime(), num
 
 
 
 
 
with futures.ThreadPoolExecutor(max_workers=1) as executor:
 
future = executor.submit(test, 1)
 
print(future.result())

 

1.2 Executor.map(func, *iterables, timeout=None)

相當於map(func, *iterables),但是func是異步執行。timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數,則不設置超時間。

func:需要異步執行的函數
*iterables:可迭代對象,如列表等。每一次func執行,都會從iterables中取參數。
timeout:設置每次異步操作的超時時間

示例代碼:

# -*- coding:utf-8 -*-
 
from concurrent import futures
 
 
 
 
 
def test(num):
 
import time
 
return time.ctime(), num
 
 
 
 
 
data = [1, 2, 3]
 
with futures.ThreadPoolExecutor(max_workers=1) as executor:
 
for future in executor.map(test, data):
 
print(future)

 

1.3 Executor.shutdown(wait=True)

釋放系統資源,在Executor.submit()或 Executor.map()等異步操作后調用。使用with語句可以避免顯式調用此方法

 

2. ThreadPoolExecutor對象

ThreadPoolExecutor類是Executor子類,使用線程池執行異步調用.

class concurrent.futures.ThreadPoolExecutor(max_workers),使用 max_workers 數目的線程池執行異步調用

 

3. ProcessPoolExecutor對象

ThreadPoolExecutor類是Executor子類,使用進程池執行異步調用.

class concurrent.futures.ProcessPoolExecutor(max_workers=None),使用 max_workers數目的進程池執行異步調用,如果max_workers為None則使用機器的處理器數目(如4核機器max_worker配置為None時,則使用4個進程進行異步並發)。

示例代碼:

# -*- coding:utf-8 -*-
 
from concurrent import futures
 
 
 
 
 
def test(num):
 
import time
 
return time.ctime(), num
 
 
 
 
 
def muti_exec(m, n):
 
# m 並發次數
 
# n 運行次數
 
 
 
with futures.ProcessPoolExecutor(max_workers=m) as executor: # 多進程
 
# with futures.ThreadPoolExecutor(max_workers=m) as executor: #多線程
 
executor_dict = dict((executor.submit(test, times), times) for times in range(m * n))
 
 
 
for future in futures.as_completed(executor_dict):
 
times = executor_dict[future]
 
if future.exception() is not None:
 
print('%r generated an exception: %s' % (times, future.exception()))
 
else:
 
print('RunTimes:%d,Res:%s' % (times, future.result()))
 
 
 
 
 
if __name__ == '__main__':
 
muti_exec(5, 1)

 

 

調度單個任務

 

執行者類Executor調度單個任務,使用submit() 函數,然后用返回的 Future 實例等待任務結果。

Executor 是一個 Python concurrent.futures 模塊的抽象類。 它不能直接使用,我們需要使用以下具體子類之一 -

  • ThreadPoolExecutor:線程池
  • ProcessPoolExecutor:進程池

 

示例代碼:

from concurrent import futures
 
import time
 
import random
 
 
 
 
 
def task(n):
 
time.sleep(random.randint(1, 10))
 
return n
 
 
 
 
 
executor = futures.ThreadPoolExecutor(max_workers=3)
 
future = executor.submit(task, 5)
 
print('future: {}'.format(future))
 
result = future.result()
 
print('result: {}'.format(result))

 

 

線程池 和 進程池

 

  1. ThreadPoolExecutor 是 Executor類的具體子類之一。 子類使用多線程,我們得到一個提交任務的線程池。 該池將任務分配給可用線程並安排它們運行。
  2. ProcessPoolExecutor Executor類的具體子類之一。 它使用多重處理,並且我們獲得提交任務的進程池。 此池將任務分配給可用的進程並安排它們運行。

 

如何創建一個 ThreadPoolExecutor 或者 ProcessPoolExecutor?

        在concurrent.futures模塊及其具體子類Executor的幫助下,可以很容易地創建一個線程池或者進程池。 需要使用我們想要的池中的線程數構造一個ThreadPoolExecutor 或者 ProcessPoolExecutor。 默認情況下,數字是5。然后可以提交一個任務到線程池或者進程池。 當submit()任務時,會返回Future對象。 Future對象有一個名為done()的方法,它告訴Future是否已經解決。 有了這個,為這個特定的Future對象設定了一個值。 當任務完成時,線程池執行器將該值設置為Future的對象。

線程池 示例代碼:

from concurrent.futures import ThreadPoolExecutor
 
from time import sleep
 
 
 
 
 
def task(message):
 
sleep(2)
 
return message
 
 
 
 
 
def main():
 
executor = ThreadPoolExecutor(5)
 
future = executor.submit(task, "Completed")
 
print(future.done())
 
sleep(2)
 
print(future.done())
 
print(future.result())
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

結果截圖:

在上面的例子中,一個ThreadPoolExecutor已經由5個線程構造而成。 然后,在提供消息之前等待2秒的任務被提交給線程池執行器。 從輸出中可以看出,任務直到2秒才完成,所以第一次調用done()將返回False2秒后,任務完成,我們通過調用result()方法得到future的結果。

進程池 示例代碼:

from concurrent.futures import ProcessPoolExecutor
 
from time import sleep
 
 
 
 
 
def task(message):
 
sleep(2)
 
return message
 
 
 
 
 
def main():
 
executor = ProcessPoolExecutor(5)
 
future = executor.submit(task, ("Completed"))
 
print(future.done())
 
sleep(2)
 
print(future.done())
 
print(future.result())
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

 

實例化ThreadPoolExecutor 或者 ProcessPoolExecutor  之 上下文管理器
另一種實例化ThreadPoolExecutor的方法是在上下文管理器的幫助下完成的。 它的工作方式與上例中使用的方法類似。 使用上下文管理器的主要優點是它在語法上看起來不錯。 實例化可以在下面的代碼的幫助下完成

with ThreadPoolExecutor(max_workers = 5) as executor
 
或者
 
with ProcessPoolExecutor(max_workers = 5) as executor

 

示例

以下示例是從 Python 文檔借用的。 在這個例子中,首先必須導入 concurrent.futures 模塊。 然后創建一個名為 load_url()的函數,它將加載請求的url。 然后該函數用池中的5個線程創建 ThreadPoolExecutorThreadPoolExecutor 已被用作上下文管理器。 我們可以通過調用 result()方法來獲得 future的結果。

import concurrent.futures
 
import urllib.request
 
 
 
URLS = [
 
'http://www.foxnews.com/',
 
'https://www.yiibai.com/',
 
'http://europe.wsj.com/',
 
'http://www.bbc.co.uk/',
 
'http://some-made-up-domain.com/'
 
]
 
 
 
 
 
def load_url(url, timeout):
 
with urllib.request.urlopen(url, timeout=timeout) as conn:
 
return conn.read()
 
 
 
 
 
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
 
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
 
for future in concurrent.futures.as_completed(future_to_url):
 
url = future_to_url[future]
 
try:
 
data = future.result()
 
except Exception as exc:
 
print('%r generated an exception: %s' % (url, exc))
 
else:
 
print('%r page is %d bytes' % (url, len(data)))

 

以下將是上面的Python腳本的輸出 -

  1.  
    'http: //some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
  2.  
    'http: //www.foxnews.com/' page is 229313 bytes
  3.  
    'http: //www.yiibai.com/' page is 168933 bytes
  4.  
    'http: //www.bbc.co.uk/' page is 283893 bytes
  5.  
    'http: //europe.wsj.com/' page is 938109 bytes

進程池:

import concurrent.futures
 
from concurrent.futures import ProcessPoolExecutor
 
import urllib.request
 
 
 
URLS = ['http://www.foxnews.com/',
 
'http://www.cnn.com/',
 
'http://europe.wsj.com/',
 
'http://www.bbc.co.uk/',
 
'http://some-made-up-domain.com/']
 
 
 
 
 
def load_url(url, timeout):
 
with urllib.request.urlopen(url, timeout=timeout) as conn:
 
return conn.read()
 
 
 
 
 
def main():
 
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
 
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
 
for future in concurrent.futures.as_completed(future_to_url):
 
url = future_to_url[future]
 
try:
 
data = future.result()
 
except Exception as exc:
 
print('%r generated an exception: %s' % (url, exc))
 
else:
 
print('%r page is %d bytes' % (url, len(data)))
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

 

 

使用 map() 調度多任務,有序返回

 

使用map(),多個worker並發地從輸入迭代器里取數據,處理,然后按順序返回結果。

示例代碼:

from concurrent import futures
 
import time
 
import random
 
 
 
 
 
def task(n):
 
time.sleep(random.randint(1, 10))
 
return n
 
 
 
 
 
executor = futures.ThreadPoolExecutor(max_workers=3)
 
results = executor.map(task, range(1, 10))
 
print('unprocessed results: {}'.format(results))
 
real_results = list(results)
 
print('real results: {}'.format(real_results))

 

使用 Executor.map() 函數

Python map()函數廣泛用於許多任務。 一個這樣的任務是對可迭代內的每個元素應用某個函數。 同樣,可以將迭代器的所有元素映射到一個函數,並將這些作為獨立作業提交到ThreadPoolExecutor之外。 考慮下面的Python腳本示例來理解函數的工作原理。

示例
在下面的示例中,map函數用於將square()函數應用於values數組中的每個值。

from concurrent.futures import ThreadPoolExecutor
 
from concurrent.futures import as_completed
 
 
 
values = [2, 3, 4, 5]
 
 
 
 
 
def square(n):
 
return n * n
 
 
 
 
 
def main():
 
with ThreadPoolExecutor(max_workers=3) as executor:
 
results = executor.map(square, values)
 
for result in results:
 
print(result)
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

以下將是上面的Python腳本的輸出 :

進程池:

from concurrent.futures import ProcessPoolExecutor
 
from concurrent.futures import as_completed
 
 
 
values = [2, 3, 4, 5]
 
 
 
 
 
def square(n):
 
return n * n
 
 
 
 
 
def main():
 
with ProcessPoolExecutor(max_workers=3) as executor:
 
results = executor.map(square, values)
 
for result in results:
 
print(result)
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

 

多任務調度,無序返回

 

不斷將任務submit到executor,返回future列表,使用as_completed無序產生每個任務的結果。

示例代碼:

from concurrent import futures
 
import time
 
import random
 
 
 
 
 
def task(n):
 
time.sleep(random.randint(1, 10))
 
return n
 
 
 
 
 
executor = futures.ThreadPoolExecutor(max_workers=3)
 
future_list = [executor.submit(task, i) for i in range(1, 10)]
 
for f in futures.as_completed(future_list):
 
print(f.result())

 

何時使用ProcessPoolExecutor 和 ThreadPoolExecutor ?

現在我們已經學習了兩個Executor類 - ThreadPoolExecutorProcessPoolExecutor,我們需要知道何時使用哪個執行器。需要在受CPU限制的工作負載情況下選擇ProcessPoolExecutor,而在受I/O限制的工作負載情況下則需要選擇ThreadPoolExecutor

如果使用ProcessPoolExecutor,那么不需要擔心GIL,因為它使用多處理。 而且,與ThreadPoolExecution相比,執行時間會更少。

 

轉載:https://blog.csdn.net/freeking101/article/details/97395745

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM