python並發之concurrent.futures


  concurrent:並發

  Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼。從Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,實現了對threadingmultiprocessing的更高級的抽象,對編寫線程池/進程池提供了直接的支持。 
concurrent.futures基礎模塊是executor和future。

  Executor  

  Executor是一個抽象類,它不能被直接使用。它為具體的異步執行定義了一些基本的方法。 ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建線程池和進程池的代碼。

  submit方法

  Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回調task,並返回一個future實例。future對象代表的就是給定的調用。

  我們使用submit方法來往線程池中加入一個task,submit返回一個Future對象,對於Future對象可以簡單地理解為一個在未來完成的操作。

  map方法

  Exectuor還為我們提供了map方法,和內建的map用法類似。映射。

  future

  Future實例是由Executor.submit()創建的。可以理解為一個在未來完成的操作,這是異步編程的基礎。通常情況下,我們執行io操作,訪問url時(如下)在等待結果返回之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

  示例:

  

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def foo(i):
    print('%s is running %s'%(os.getpid(),i))
    time.sleep(random.randint(1, 3))
    return i**2
if __name__ == '__main__':
    print('cpu_num:',os.cpu_count())
    executor=ProcessPoolExecutor()
    print('executor',executor,type(executor))
    # futures=[]
    # for i in range(10):
    #     future=executor.submit(foo,i)
    #     futures.append(future)
    futures=[executor.submit(foo,i) for i in range(10)]
    executor.shutdown()
    #程序運行到這里有明顯的時間間隔,可見是在shutdown存在的情況下,程序將future全部執行完,才繼續往下走的
    print('')
    print(futures)
    for future in futures:
        print(future.result())

  輸出:

cpu_num: 8
executor <concurrent.futures.process.ProcessPoolExecutor object at 0x00000276745AA978> <class 'concurrent.futures.process.ProcessPoolExecutor'>
11740 is running 0
3156 is running 1
9928 is running 2
2208 is running 3
2324 is running 4
13080 is running 5
1892 is running 6
2964 is running 7
2208 is running 8
2324 is running 9
主
[<Future at 0x27674900e10 state=finished returned int>, <Future at 0x27674949dd8 state=finished returned int>, <Future at 0x27674949e80 state=finished returned int>, <Future at 0x27674949f28 state=finished returned int>, <Future at 0x27674949fd0 state=finished returned int>, <Future at 0x2767495a0b8 state=finished returned int>, <Future at 0x2767495a198 state=finished returned int>, <Future at 0x2767495a278 state=finished returned int>, <Future at 0x2767495a358 state=finished returned int>, <Future at 0x2767495a438 state=finished returned int>]
0
1
4
9
16
25
36
49
64
81

  

  利用ThreadProcessExecutor爬蟲

  

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
def get(url):
    r=requests.get(url)
    return {'url':url,'text':r.text}
def parse(future):
    dic=future.result()          #future對象調用result方法取其值、
    f=open('db.text','a')
    date='url:%s\n'%len(dic['text'])
    f.write(date)
    f.close()
if __name__ == '__main__':
    executor=ThreadPoolExecutor()
    url_l = ['http://cn.bing.com/', 'http://www.cnblogs.com/wupeiqi/', 'http://www.cnblogs.com/654321cc/',
                 'https://www.cnblogs.com/', 'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html',
                 'http://www.xilu.com/news/shaonianxinzangyou5gedong.html', ]
    futures=[]
    for url in url_l:
        executor.submit(get,url).add_done_callback(parse)         #與Pool進程池回調函數接收的是A函數的返回值(對象ApplyResult.get()得到的值)。
    executor.shutdown()                                           #這里回調函數parse,接收的參數是submit生成的 Future對象。
    print('')

  輸出:

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM