python3之concurrent.futures一個多線程多進程的直接對接模塊,python3.2有線程池了


Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼。從Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,實現了對threadingmultiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。 
concurrent.futures基礎模塊是executor和future。

 

Executor

Executor是一個抽象類,它不能被直接使用。它為具體的異步執行定義了一些基本的方法。 
ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""

    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None):
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        def result_iterator():
            try:
                for future in fs:
                    if timeout is None:
                        yield future.result()
                    else:
                        yield future.result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

  

submit()方法

Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future對象代表的就是給定的調用。 
通過下面的例子來理解submit對線程池/進程池的操作。

# coding: utf-8

from concurrent.futures import ThreadPoolExecutor
import time


def return_future(msg):
    time.sleep(3)
    return msg


# 創建一個線程池
pool = ThreadPoolExecutor(max_workers=2)

# 往線程池加入2個task
f1 = pool.submit(return_future, 'hello')
f2 = pool.submit(return_future, 'world')

print(f1.done())
time.sleep(3)
print(f2.done())

print(f1.result())
print(f2.result())

  

改寫為進程池形式很簡單,把ThreadPoolExecutor替換為ProcessPoolExecutor即可。如果需要提交多個task,可以通過循環多次submit()

map()方法

除了submit,Exectuor還為我們提供了map方法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調執行返回的結果有序的。可以通過下面的例子來理解:

# coding: utf-8

from concurrent.futures import ThreadPoolExecutor as Pool
import requests

URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']


def task(url, timeout=10):
    return requests.get(url, timeout=timeout)


pool = Pool(max_workers=3)
results = pool.map(task, URLS)

for ret in results:
    print('%s, %s' % (ret.url, len(ret.content)))

  執行結果

http://www.baidu.com/, 2381
http://www.qq.com/, 252160
http://www.sina.com.cn/, 607265

  

Future

Future可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

import requests    

data = requests.get('http://www.baidu.com').content    
print len(data)

Future實例是由Executor.submit()創建的。Future提供了豐富的方法來處理調用。

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import as_completed
import requests

URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]


def task(url, timeout=10):
    return requests.get(url, timeout=timeout)


with Pool(max_workers=3) as executor:
    future_tasks = [executor.submit(task, url) for url in URLS]

    for f in future_tasks:
        if f.running():
            print('%s is running' % str(f))

    for f in as_completed(future_tasks):
        try:
            ret = f.done()
            if ret:
                f_ret = f.result()
                print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
        except Exception as e:
            f.cancel()
            print(str(e))

  結果

<Future at 0x7fc2716e1f60 state=running> is running
<Future at 0x7fc27136d4e0 state=running> is running
<Future at 0x7fc27136d710 state=running> is running
<Future at 0x7fc27136d710 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
<Future at 0x7fc2716e1f60 state=finished returned Response>, done, result: http://www.qq.com/, 252343
<Future at 0x7fc27136d4e0 state=finished returned Response>, done, result: http://www.sina.com.cn/, 602366

從運行結果可以看出,as_completed不是按照URLS列表元素的順序返回的。這也表明,並發訪問不通的url時,沒有阻塞。

wait

wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默認設置為ALL_COMPLETED。

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor as Pool
from concurrent.futures import wait
import requests

URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]


def task(url, timeout=10):
    return requests.get(url, timeout=timeout)


with Pool(max_workers=3) as executor:
    future_tasks = [executor.submit(task, url) for url in URLS]

    for f in future_tasks:
        if f.running():
            print('%s is running' % str(f))

    results = wait(future_tasks)
    done = results[0]
    for x in done:
        print(x)

  wait有timeout和return_when兩個參數可以設置。 
timeout控制wait()方法返回前等待的時間。 
return_when決定方法什么時間點返回:如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成;如果采用FIRST_COMPLETED參數,程序並不會等到線程池里面所有的任務都完成。

轉自

http://blog.csdn.net/dutsoft/article/details/54728706

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM