就庫的范圍,個人認為網絡爬蟲必備庫知識包括urllib、requests、re、BeautifulSoup、concurrent.futures,接下來將結對concurrent.futures庫的使用方法進行總結
建議閱讀本博的博友先閱讀下上篇博客: python究竟要不要使用多線程,將會對concurrent.futures庫的使用有幫助。
1. concurrent.futures庫簡介
python標准庫為我們提供了threading和mutiprocessing模塊實現異步多線程/多進程功能。從python3.2版本開始,標准庫又為我們提供了concurrent.futures模塊來實現線程池和進程池功能,實現了對threading和mutiprocessing模塊的高級抽象,更大程度上方便了我們python程序員。
concurrent.futures模塊提供了ThreadPoolExecutor
和ProcessPoolExecutor
兩個類
(1)看下來個類的繼承關系和關鍵屬性
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor print('ThreadPoolExecutor繼承關系:',ThreadPoolExecutor.__mro__) print('ThreadPoolExecutor屬性:',[attr for attr in dir(ThreadPoolExecutor) if not attr.startswith('_')]) print('ProcessPoolExecutor繼承關系:',ProcessPoolExecutor.__mro__) print('ThreadPoolExecutor屬性:',[attr for attr in dir(ProcessPoolExecutor) if not attr.startswith('_')])
都繼承自futures._base.Executor類,擁有三個重要方法map、submit和shutdow,這樣看起來就很簡單了
(2)再看下futures._base.Executor基類實現

class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Returns: A Future representing the given call. """ raise NotImplementedError() def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in zip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: # reverse to keep finishing order fs.reverse() while fs: # Careful not to keep a reference to the popped future if timeout is None: yield fs.pop().result() else: yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. """ pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False
提供了map、submit、shutdow和with方法,下面首先對這個幾個方法的使用進行說明
2. map函數
函數原型:def map(self, fn, *iterables, timeout=None, chunksize=1)
map函數和python自帶的map函數用法一樣,只不過該map函數從迭代器獲取參數后異步執行,timeout用於設置超時時間
參數chunksize的理解:
The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.
例:
from concurrent.futures import ThreadPoolExecutor import time import requests def download(url): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Connection':'keep-alive', 'Host':'example.webscraping.com'} response = requests.get(url, headers=headers) return(response.status_code) if __name__ == '__main__': urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1', 'http://example.webscraping.com/places/default/view/Aland-Islands-2'] pool = ProcessPoolExecutor(max_workers = 2)
start = time.time() result = list(pool.map(download, urllist)) end = time.time() print('status_code:',result) print('使用多線程--timestamp:{:.3f}'.format(end-start))
3. submit函數
函數原型:def submit(self, fn, *args, **kwargs)
fn:需要異步執行的函數
args、kwargs:函數傳遞的參數
例:下例中future類的使用的as_complete后面介紹
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed import time import requests def download(url): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Connection':'keep-alive', 'Host':'example.webscraping.com'} response = requests.get(url, headers=headers) return response.status_code if __name__ == '__main__': urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1', 'http://example.webscraping.com/places/default/view/Aland-Islands-2'] start = time.time() pool = ProcessPoolExecutor(max_workers = 2) futures = [pool.submit(download,url) for url in urllist] for future in futures: print('執行中:%s, 已完成:%s' % (future.running(), future.done())) print('#### 分界線 ####') for future in as_completed(futures, timeout=2): print('執行中:%s, 已完成:%s' % (future.running(), future.done())) print(future.result()) end = time.time() print('使用多線程--timestamp:{:.3f}'.format(end-start))
輸出:
4. shutdown函數
函數原型:def shutdown(self, wait=True)
此函數用於釋放異步執行操作后的系統資源
由於_base.Executor類提供了上下文方法,將shutdown封裝在了__exit__中,若使用with方法,將不需要自己進行資源釋放
with ProcessPoolExecutor(max_workers = 2) as pool:
5. Future類
submit函數返回Future對象,Future類提供了跟蹤任務執行狀態的方法:
future.running():判斷任務是否執行
futurn.done:判斷任務是否執行完成
futurn.result():返回函數執行結果
futures = [pool.submit(download,url) for url in urllist] for future in futures: print('執行中:%s, 已完成:%s' % (future.running(), future.done())) print('#### 分界線 ####') for future in as_completed(futures, timeout=2): print('執行中:%s, 已完成:%s' % (future.running(), future.done())) print(future.result())
as_completed方法傳入futures迭代器和timeout兩個參數
默認timeout=None,阻塞等待任務執行完成,並返回執行完成的future對象迭代器,迭代器是通過yield實現的。
timeout>0,等待timeout時間,如果timeout時間到仍有任務未能完成,不再執行並拋出異常TimeoutError
6. 回調函數
Future類提供了add_done_callback函數可以自定義回調函數:
def add_done_callback(self, fn): """Attaches a callable that will be called when the future finishes. Args: fn: A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added. """ with self._condition: if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: self._done_callbacks.append(fn) return fn(self)
例子:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed import time import requests def download(url): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Connection':'keep-alive', 'Host':'example.webscraping.com'} response = requests.get(url, headers=headers) return response.status_code def callback(future): print(future.result()) if __name__ == '__main__': urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1', 'http://example.webscraping.com/places/default/view/Aland-Islands-2', 'http://example.webscraping.com/places/default/view/Albania-3', 'http://example.webscraping.com/places/default/view/Algeria-4', 'http://example.webscraping.com/places/default/view/American-Samoa-5'] start = time.time() with ProcessPoolExecutor(max_workers = 2) as pool: futures = [pool.submit(download,url) for url in urllist] for future in futures: print('執行中:%s, 已完成:%s' % (future.running(), future.done())) print('#### 分界線 ####') for future in as_completed(futures, timeout=5): future.add_done_callback(callback) print('執行中:%s, 已完成:%s' % (future.running(), future.done())) end = time.time() print('使用多線程--timestamp:{:.3f}'.format(end-start))
7. wait函數
函數原型:def wait(fs, timeout=None, return_when=ALL_COMPLETED)

def wait(fs, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the given sequence to complete. Args: fs: The sequence of Futures (possibly created by different Executors) to wait upon. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. return_when: Indicates when this function should return. The options are: FIRST_COMPLETED - Return when any future finishes or is cancelled. FIRST_EXCEPTION - Return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED. ALL_COMPLETED - Return when all futures finish or are cancelled. Returns: A named 2-tuple of sets. The first set, named 'done', contains the futures that completed (is finished or cancelled) before the wait completed. The second set, named 'not_done', contains uncompleted futures. """ with _AcquireFutures(fs): done = set(f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) not_done = set(fs) - done if (return_when == FIRST_COMPLETED) and done: return DoneAndNotDoneFutures(done, not_done) elif (return_when == FIRST_EXCEPTION) and done: if any(f for f in done if not f.cancelled() and f.exception() is not None): return DoneAndNotDoneFutures(done, not_done) if len(done) == len(fs): return DoneAndNotDoneFutures(done, not_done) waiter = _create_and_install_waiters(fs, return_when) waiter.event.wait(timeout) for f in fs: with f._condition: f._waiters.remove(waiter) done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, set(fs) - done)
wait方法返回一個中包含兩個元組,元組中包含兩個集合(set),一個是已經完成的(completed),一個是未完成的(uncompleted)
它接受三個參數,重點看下第三個參數:
FIRST_COMPLETED:Return when any future finishes or iscancelled.
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,\ as_completed,wait,ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION import time import requests def download(url): headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Connection':'keep-alive', 'Host':'example.webscraping.com'} response = requests.get(url, headers=headers) return response.status_code if __name__ == '__main__': urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1', 'http://example.webscraping.com/places/default/view/Aland-Islands-2', 'http://example.webscraping.com/places/default/view/Albania-3', 'http://example.webscraping.com/places/default/view/Algeria-4', 'http://example.webscraping.com/places/default/view/American-Samoa-5'] start = time.time() with ProcessPoolExecutor(max_workers = 2) as pool: futures = [pool.submit(download,url) for url in urllist] for future in futures: print('執行中:%s, 已完成:%s' % (future.running(), future.done())) print('#### 分界線 ####') completed, uncompleted = wait(futures, timeout=2, return_when=FIRST_COMPLETED) for cp in completed: print('執行中:%s, 已完成:%s' % (cp.running(), cp.done())) print(cp.result()) end = time.time() print('使用多線程--timestamp:{:.3f}'.format(end-start))
輸出:
只返回了一個完成的