網絡爬蟲必備知識之concurrent.futures庫


就庫的范圍,個人認為網絡爬蟲必備庫知識包括urllib、requests、re、BeautifulSoup、concurrent.futures,接下來將結對concurrent.futures庫的使用方法進行總結
建議閱讀本博的博友先閱讀下上篇博客: python究竟要不要使用多線程,將會對concurrent.futures庫的使用有幫助。

1. concurrent.futures庫簡介

  python標准庫為我們提供了threading和mutiprocessing模塊實現異步多線程/多進程功能。從python3.2版本開始,標准庫又為我們提供了concurrent.futures模塊來實現線程池和進程池功能,實現了對threading和mutiprocessing模塊的高級抽象,更大程度上方便了我們python程序員。

  concurrent.futures模塊提供了ThreadPoolExecutorProcessPoolExecutor兩個類

(1)看下來個類的繼承關系和關鍵屬性

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

print('ThreadPoolExecutor繼承關系:',ThreadPoolExecutor.__mro__)
    print('ThreadPoolExecutor屬性:',[attr for attr in dir(ThreadPoolExecutor) if not attr.startswith('_')])
    print('ProcessPoolExecutor繼承關系:',ProcessPoolExecutor.__mro__)
    print('ThreadPoolExecutor屬性:',[attr for attr in dir(ProcessPoolExecutor) if not attr.startswith('_')])

  都繼承自futures._base.Executor類,擁有三個重要方法map、submit和shutdow,這樣看起來就很簡單了

(2)再看下futures._base.Executor基類實現

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a child process. This argument is only
                used by ProcessPoolExecutor; it is ignored by
                ThreadPoolExecutor.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()
                    else:
                        yield fs.pop().result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        """Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Args:
            wait: If True then shutdown will not return until all running
                futures have finished executing and the resources used by the
                executor have been reclaimed.
        """
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False
View Code

  提供了map、submit、shutdow和with方法,下面首先對這個幾個方法的使用進行說明

2. map函數

  函數原型:def map(self, fn, *iterables, timeout=None, chunksize=1)

  map函數和python自帶的map函數用法一樣,只不過該map函數從迭代器獲取參數后異步執行,timeout用於設置超時時間

  參數chunksize的理解

The size of the chunks the iterable will be broken into
 before being passed to a child process. This argument is only
 used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

  例:

from concurrent.futures import ThreadPoolExecutor
import time 
import requests

def download(url):
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
                'Connection':'keep-alive',
                'Host':'example.webscraping.com'}
    response = requests.get(url, headers=headers)
    return(response.status_code)
    
if __name__ == '__main__':
    urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1',
               'http://example.webscraping.com/places/default/view/Aland-Islands-2']
               
    pool = ProcessPoolExecutor(max_workers = 2) 
start
= time.time() result = list(pool.map(download, urllist)) end = time.time() print('status_code:',result) print('使用多線程--timestamp:{:.3f}'.format(end-start))

3. submit函數

  函數原型:def submit(self, fn, *args, **kwargs)

  fn:需要異步執行的函數

  args、kwargs:函數傳遞的參數

  :下例中future類的使用的as_complete后面介紹

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
import time 
import requests

def download(url):
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
                'Connection':'keep-alive',
                'Host':'example.webscraping.com'}
    response = requests.get(url, headers=headers)
    return response.status_code
    
if __name__ == '__main__':
    urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1',
               'http://example.webscraping.com/places/default/view/Aland-Islands-2']
               
    start = time.time()               
    pool = ProcessPoolExecutor(max_workers = 2)                  
    futures = [pool.submit(download,url) for url in urllist]
    for future in futures:
        print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
    print('#### 分界線 ####')
    for future in as_completed(futures, timeout=2):
        print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
        print(future.result())
    end = time.time()
    print('使用多線程--timestamp:{:.3f}'.format(end-start))

   輸出

  

4. shutdown函數

  函數原型:def shutdown(self, wait=True)

  此函數用於釋放異步執行操作后的系統資源

  由於_base.Executor類提供了上下文方法,將shutdown封裝在了__exit__中,若使用with方法,將不需要自己進行資源釋放

with ProcessPoolExecutor(max_workers = 2) as pool:

5. Future類

  submit函數返回Future對象,Future類提供了跟蹤任務執行狀態的方法:

  future.running():判斷任務是否執行

  futurn.done:判斷任務是否執行完成

  futurn.result():返回函數執行結果

futures = [pool.submit(download,url) for url in urllist]
for future in futures:
    print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
print('#### 分界線 ####')
for future in as_completed(futures, timeout=2):
    print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
    print(future.result())

  as_completed方法傳入futures迭代器和timeout兩個參數

  默認timeout=None,阻塞等待任務執行完成,並返回執行完成的future對象迭代器,迭代器是通過yield實現的。 

  timeout>0,等待timeout時間,如果timeout時間到仍有任務未能完成,不再執行並拋出異常TimeoutError

6. 回調函數

  Future類提供了add_done_callback函數可以自定義回調函數:

def add_done_callback(self, fn):
        """Attaches a callable that will be called when the future finishes.

        Args:
            fn: A callable that will be called with this future as its only
                argument when the future completes or is cancelled. The callable
                will always be called by a thread in the same process in which
                it was added. If the future has already completed or been
                cancelled then the callable will be called immediately. These
                callables are called in the order that they were added.
        """
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)
                return
        fn(self)

   例子:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
import time 
import requests

def download(url):
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
                'Connection':'keep-alive',
                'Host':'example.webscraping.com'}
    response = requests.get(url, headers=headers)
    return response.status_code
    
def callback(future): print(future.result()) if __name__ == '__main__':
    urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1',
               'http://example.webscraping.com/places/default/view/Aland-Islands-2',
               'http://example.webscraping.com/places/default/view/Albania-3',
               'http://example.webscraping.com/places/default/view/Algeria-4',
               'http://example.webscraping.com/places/default/view/American-Samoa-5']
               
    start = time.time()               
    with ProcessPoolExecutor(max_workers = 2) as pool:                  
        futures = [pool.submit(download,url) for url in urllist]
        for future in futures:
            print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
            print('#### 分界線 ####')
        for future in as_completed(futures, timeout=5):
            future.add_done_callback(callback)
            print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
        end = time.time()
        print('使用多線程--timestamp:{:.3f}'.format(end-start))

7. wait函數

  函數原型:def wait(fs, timeout=None, return_when=ALL_COMPLETED)

def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    """Wait for the futures in the given sequence to complete.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            wait upon.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        return_when: Indicates when this function should return. The options
            are:

            FIRST_COMPLETED - Return when any future finishes or is
                              cancelled.
            FIRST_EXCEPTION - Return when any future finishes by raising an
                              exception. If no future raises an exception
                              then it is equivalent to ALL_COMPLETED.
            ALL_COMPLETED -   Return when all futures finish or are cancelled.

    Returns:
        A named 2-tuple of sets. The first set, named 'done', contains the
        futures that completed (is finished or cancelled) before the wait
        completed. The second set, named 'not_done', contains uncompleted
        futures.
    """
    with _AcquireFutures(fs):
        done = set(f for f in fs
                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        not_done = set(fs) - done

        if (return_when == FIRST_COMPLETED) and done:
            return DoneAndNotDoneFutures(done, not_done)
        elif (return_when == FIRST_EXCEPTION) and done:
            if any(f for f in done
                   if not f.cancelled() and f.exception() is not None):
                return DoneAndNotDoneFutures(done, not_done)

        if len(done) == len(fs):
            return DoneAndNotDoneFutures(done, not_done)

        waiter = _create_and_install_waiters(fs, return_when)

    waiter.event.wait(timeout)
    for f in fs:
        with f._condition:
            f._waiters.remove(waiter)

    done.update(waiter.finished_futures)
    return DoneAndNotDoneFutures(done, set(fs) - done)
View Code

  wait方法返回一個中包含兩個元組,元組中包含兩個集合(set),一個是已經完成的(completed),一個是未完成的(uncompleted)

  它接受三個參數,重點看下第三個參數:

  FIRST_COMPLETED:Return when any future finishes or iscancelled. 

  FIRST_EXCEPTION:Return when any future finishes by raising an exception,
             If no future raises an exception then it is equivalent to ALL_COMPLETED.
  ALL_COMPLETED:Return when all futures finish or are cancelled
  例:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,\
            as_completed,wait,ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
import time 
import requests

def download(url):
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
                'Connection':'keep-alive',
                'Host':'example.webscraping.com'}
    response = requests.get(url, headers=headers)
    return response.status_code
    
if __name__ == '__main__':
    urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1',
               'http://example.webscraping.com/places/default/view/Aland-Islands-2',
               'http://example.webscraping.com/places/default/view/Albania-3',
               'http://example.webscraping.com/places/default/view/Algeria-4',
               'http://example.webscraping.com/places/default/view/American-Samoa-5']
               
    start = time.time()               
    with ProcessPoolExecutor(max_workers = 2) as pool:                  
        futures = [pool.submit(download,url) for url in urllist]
        for future in futures:
            print('執行中:%s, 已完成:%s' % (future.running(), future.done()))
        print('#### 分界線 ####')
        completed, uncompleted = wait(futures, timeout=2, return_when=FIRST_COMPLETED)
        for cp in completed:
            print('執行中:%s, 已完成:%s' % (cp.running(), cp.done()))
            print(cp.result())
        end = time.time()
        print('使用多線程--timestamp:{:.3f}'.format(end-start))

  輸出

 

  只返回了一個完成的

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM