使用concurrent.futures模塊並發,實現進程池、線程池


一、關於concurrent.futures模塊

Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的異步多線程/多進程代碼。從Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。實現了對threadingmultiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。 
concurrent.futures基礎模塊是executor和future。

 

  concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來創建線程池和進程池的代碼。我們可以將相應的tasks直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。

 

  Future這個概念相信有java和nodejs下編程經驗的朋友肯定不陌生了,你可以把它理解為一個在未來完成的操作,這是異步編程的基礎,傳統編程模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。


Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future對象代表的就是給定的調用。 

二、submit()方法實現進程池/線程池

進程池

from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
    print('%s is running' %os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    p=ProcessPoolExecutor()  #不填則默認為cpu的個數
    l=[]
    start=time.time()
    for i in range(10):
        obj=p.submit(task,i)   #submit()方法返回的是一個future實例,要得到結果需要用obj.result()
        l.append(obj)

    p.shutdown()  #類似用from multiprocessing import Pool實現進程池中的close及join一起的作用
    print('='*30)
    # print([obj for obj in l])
    print([obj.result() for obj in l])
    print(time.time()-start)

    #上面方法也可寫成下面的方法
    # start = time.time()
    # with ProcessPoolExecutor() as p:   #類似打開文件,可省去.shutdown()
    #     future_tasks = [p.submit(task, i) for i in range(10)]
    # print('=' * 30)
    # print([obj.result() for obj in future_tasks])
    # print(time.time() - start)

 

線程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
    time.sleep(2)
    return n**2

if __name__ == '__main__':
    p=ThreadPoolExecutor()   #不填則默認為cpu的個數*5
    l=[]
    start=time.time()
    for i in range(10):
        obj=p.submit(task,i)
        l.append(obj)
    p.shutdown()
    print('='*30)
    print([obj.result() for obj in l])
    print(time.time()-start)

#上面方法也可寫成下面的方法
    # start = time.time()
    # with ThreadPoolExecutor() as p:   #類似打開文件,可省去.shutdown()
    #     future_tasks = [p.submit(task, i) for i in range(10)]
    # print('=' * 30)
    # print([obj.result() for obj in future_tasks])
    # print(time.time() - start)

 

默認為異步執行

#p.submit(task,i).result()即同步執行
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(n):
    print('%s is running' %os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    p=ProcessPoolExecutor()
    start=time.time()
    for i in range(10):
        res=p.submit(task,i).result()
        print(res)
    print('='*30)
    print(time.time()-start)

 

 

三、回調函數

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
    print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
    response=requests.get(url)
    time.sleep(2)
    return {'url':url,'text':response.text}
def parse_page(res):  #此處的res是一個p.submit獲得的一個future對象,不是結果
    res=res.result()  #res.result()拿到的才是對應的結果
    print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res='url:%s size:%s\n' %(res['url'],len(res['text']))
        f.write(parse_res)
if __name__ == '__main__':
    # p=ProcessPoolExecutor()
    p=ThreadPoolExecutor()
    urls = [
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
    ]

    for url in urls:
        # multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
        p.submit(get_page, url).add_done_callback(parse_page) #與之前的回調函數拿到的結果不同,這里拿到的是前面submit方法執行完后返回的對象,要.result才能拿到對應的結果
    p.shutdown()
    print('',os.getpid())

 

 

四、map方法

和內置函數map差不多的用法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調執行返回的結果有序的。

以下是通過concurrent.futures模塊下類ThreadPoolExecutor和ProcessPoolExecutor實例化的對象的map方法實現進程池、線程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
def task(n):
    print('%s is running' %os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    # p=ProcessPoolExecutor()
    p=ThreadPoolExecutor()
    start = time.time()
    obj=p.map(task,range(10))
    p.shutdown()
    print('='*30)
    print(list(obj))
    print(time.time() - start)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM