concurrent.futures- 啟動並行任務


python因為其全局解釋器鎖GIL而無法通過線程實現真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,IO密集型 vs. 計算密集型。

IO密集型:讀取文件,讀取網絡套接字頻繁。

計算密集型:大量消耗CPU的數學與邏輯運算,也就是我們這里說的平行計算。

而concurrent.futures模塊,可以利用multiprocessing實現真正的平行計算。

核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序可以利用多核CPU來提升執行速度。由於子進程與主解釋器相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU內核。

一、初體驗

Future總結

1. python3自帶,python2需要安裝
2. Executer對象
    它是一個抽象類,它提供了異步執行的方法,他不能直接使用,但可以通過它的子類
    ThreadPoolExecuter和ProcessPoolExecuter
2.1 Executer.submit(fn,*args,**kwargs)
    fn:需要異步執行的函數
    *args,**kwargs  fn接受的參數
    該方法的作用就是提交一個可執行的回調task,它返回一個Future對象
2.2 map(fn,*iterables, timeout=None, chunksize=1)
    map(task,URLS) # 返回一個map()迭代器,這個迭代器中的回調執行返回的結果是有序的



3. Future對象相關
    future可以理解為一個在未來完成的操作,這是異步編程的基礎
    通常情況下我們在遇到IO操作的時候,將會發生阻塞,cpu不能做其他事情
    而future的引入幫助我們在這段等待時間可以完成其他的操作
3.1 done():
    如果當前線程已取消/已成功,返回True。
3.2 cance():
    如果當前線程正在執行,並且不能取消調用,返回Flase。否則調用取消,返回True

3.3 running():
    如果當前的線程正在執行,則返回True
3.4 result():
    返回調用返回的值,如果調用尚未完成,則此方法等待
    如果等待超時,會拋出concurrent.futures.TimeoutError
    如果沒有指定超時時間,則等待無時間限制
    如果在完成之前,取消了Future,則會引發CancelledError

4. as_completed():
    在多個Future實例上的迭代器將會被返回
    這些Future實例由fs完成時產生。
    由fs返回的任何重復的Future,都會被返回一次。
    里面保存的都是已經執行完成的Future對象

5. wait():
    返回一個元祖,元祖包含兩個元素
        1. 已完成的future集合
        2. 未完成的future集合

初體驗

# coding=utf-8
from concurrent import futures
from concurrent.futures import Future
import time

def return_future(msg):
    time.sleep(3)
    return msg


pool = futures.ThreadPoolExecutor(max_workers=2)

t1 = pool.submit(return_future,'hello')
t2 = pool.submit(return_future,'world')

time.sleep(3)
print(t1.done())  # 如果順利完成,則返回True
time.sleep(3)
print(t2.done())

print(t1.result()) # 獲取future的返回值
time.sleep(3)
print(t2.result())

print("主線程")

mapfunc* iterablestimeout = Nonechunksize = 1 

# coding=utf-8

import time
from concurrent.futures import Future,as_completed
from concurrent.futures import ThreadPoolExecutor as Pool
import requests
import time

URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']

def task(url,timeout=10):
    return requests.get(url=url,timeout=timeout)


pool = Pool()
result = pool.map(task,URLS)

start_time = time.time()
# 按照URLS的順序返回
for res in result:
    print("{} {}".format(res.url,len(res.content)))

# 無序的
with Pool(max_workers=3) as executer:
    future_task = [executer.submit(task,url) for url in URLS]

    for f in as_completed(future_task):
        if f.done():
            f_ret = f.result() # f.result()得到task的返回值,requests對象
            print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))

print("耗時",time.time() - start_time)
print("主線程")

二、Future對象

Future可以理解為一個未來完成的操作
當我們執行io操作的時候,在等待返回結果之前會產生阻塞
cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他操作

from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
import time

URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']

def task(url,timeout=10):
    return requests.get(url=url,timeout=timeout)

# start_time = time.time()
# for url in URLS:
#     ret = task(url)
#     print("{} {}".format(ret.url,len(ret.content)))
# print("耗時",time.time() - start_time)
with Pool(max_workers=3) as executor:
    # 創建future任務
    future_task = [executor.submit(task,url) for url in URLS]

    for f in future_task:
        if f.running():
            print("%s is running"%str(f))

    for f in as_completed(future_task):
        try:
            ret = f.done()
            if ret:
                f_ret = f.result()
                print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
        except Exception as e:
            f.cance()
            print(e)
    
"""
url不是按照順序返回的,說明並發時,當訪問某一個url時,如果沒有得到返回結果,不會發生阻塞
<Future at 0x1c63990e6d8 state=running> is running
<Future at 0x1c639922780 state=running> is running
<Future at 0x1c639922d30 state=running> is running
<Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
<Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101
<Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103
"""

三、模塊方法

concurrent.futures.wait(fstimeout=Nonereturn_when=ALL_COMPLETED)

wait()會返回一個tuple,
tuple會包含兩個集合
1. 已完成的集合
2. 未完成的集合
使用wait()會獲得更大的自由度,他接受三個參數
FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE
默認為ALL_COMPLETE
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed,wait
import requests


URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']

def task(url,timeout=10):
    return requests.get(url=url,timeout=timeout)

with Pool(max_workers=3) as execute :
    fulture_task = [execute.submit(task,url) for url in URLS]

    for f in fulture_task:
        if f.running():
            print("%s"%(str(f)))

    """
    並且wait還有timeout和return_when兩個參數
    return_when有三個常量
    FIRST_COMPLETED 任何一個future_task執行完成時/取消時,改函數返回
    FIRST_EXCEPTION 任何一個future_task發生異常時,該函數返回,如果沒有異常發生,等同於ALL_COMPLETED    
    ALL_COMPLETED 當所有的future_task執行完畢返回
    """
    results = wait(fulture_task,return_when="FIRST_COMPLETED")#
    done = results[0]
    for d in done:
        print(d)

 

concurrent.futures.as_completed(fstimeout=None)

在多個Future實例上的迭代器將會被返回
這些Future實例由fs完成時產生。
由fs返回的任何重復的Future,都會被返回一次。
里面保存的都是已經執行完成的Future對象
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests
import time

URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']

def task(url,timeout=10):
    return requests.get(url=url,timeout=timeout)

with Pool(max_workers=3) as executor:
    # 創建future任務
    future_task = [executor.submit(task,url) for url in URLS]

    for f in future_task:
        if f.running():
            print("%s is running"%str(f))
    
    for f in as_completed(future_task):
        try:
            ret = f.done()
            if ret:
                f_ret = f.result()
                print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
        except Exception as e:
            f.cance()
            print(e)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM