Python 多線程教程:並發與並行


轉載於: https://my.oschina.net/leejun2005/blog/398826

 

在批評Python的討論中,常常說起Python多線程是多么的難用。還有人對 global interpreter lock(也被親切的稱為“GIL”)指指點點,說它阻礙了Python的多線程程序同時運行。因此,如果你是從其他語言(比如C++或Java)轉過來的話,Python線程模塊並不會像你想象的那樣去運行。必須要說明的是,我們還是可以用Python寫出能並發或並行的代碼,並且能帶來性能的顯著提升,只要你能顧及到一些事情。如果你還沒看過的話,我建議你看看Eqbal Quran的文章《Ruby中的並發和並行》。

在本文中,我們將會寫一個小的Python腳本,用於下載Imgur上最熱門的圖片。我們將會從一個按順序下載圖片的版本開始做起,即一個一個地下載。在那之前,你得注冊一個Imgur上的應用。如果你還沒有Imgur賬戶,請先注冊一個。

本文中的腳本在Python3.4.2中測試通過。稍微改一下,應該也能在Python2中運行——urllib是兩個版本中區別最大的部分。

1、開始動手

讓我們從創建一個叫“download.py”的Python模塊開始。這個文件包含了獲取圖片列表以及下載這些圖片所需的所有函數。我們將這些功能分成三個單獨的函數:

  • get_links

  • download_link

  • setup_download_dir

第三個函數,“setup_download_dir”,用於創建下載的目標目錄(如果不存在的話)。

Imgur的API要求HTTP請求能支持帶有client ID的“Authorization”頭部。你可以從你注冊的Imgur應用的面板上找到這個client ID,而響應會以JSON進行編碼。我們可以使用Python的標准JSON庫去解碼。下載圖片更簡單,你只需要根據它們的URL獲取圖片,然后寫入到一個文件即可。

代碼如下:

import json import logging import os from pathlib import Path from urllib.request import urlopen, Request   logger = logging.getLogger(__name__)   def get_links(client_id):    headers = {'Authorization': 'Client-ID {}'.format(client_id)}    req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')    with urlopen(req) as resp:        data = json.loads(resp.readall().decode('utf-8'))    return map(lambda item: item['link'], data['data'])   def download_link(directory, link):    logger.info('Downloading %s', link)    download_path = directory / os.path.basename(link)    with urlopen(link) as image, download_path.open('wb') as f:        f.write(image.readall())   def setup_download_dir():    download_dir = Path('images')    if not download_dir.exists():        download_dir.mkdir()    return download_dir

接下來,你需要寫一個模塊,利用這些函數去逐個下載圖片。我們給它命名為“single.py”。它包含了我們最原始版本的Imgur圖片下載器的主要函數。這個模塊將會通過環境變量“IMGUR_CLIENT_ID”去獲取Imgur的client ID。它將會調用“setup_download_dir”去創建下載目錄。最后,使用get_links函數去獲取圖片的列表,過濾掉所有的GIF和專輯URL,然后用“download_link”去將圖片下載並保存在磁盤中。下面是“single.py”的代碼:

import logging import os from time import time   from download import setup_download_dir, get_links, download_link   logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__)   def main():    ts = time()    client_id = os.getenv('IMGUR_CLIENT_ID')    if not client_id:        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")    download_dir = setup_download_dir()    links = [l for l in get_links(client_id) if l.endswith('.jpg')]    for link in links:        download_link(download_dir, link)    print('Took {}s'.format(time() - ts))   if __name__ == '__main__':    main()

注:為了測試方便,上面兩段代碼可以用如下代碼替代演示:

# coding=utf-8 #測試utf-8編碼 from time import sleep, time import sys, threading reload(sys) sys.setdefaultencoding('utf-8') def getNums(N):     return xrange(N) def processNum(num):     num_add = num + 1     sleep(1)     print str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add) if __name__ == "__main__":     t1 = time()     for i in getNums(3):         processNum(i)     print "cost time is: {:.2f}s".format(time() - t1)

結果:

<_MainThread(MainThread, started 4436)>: 0 → 1 <_MainThread(MainThread, started 4436)>: 1 → 2 <_MainThread(MainThread, started 4436)>: 2 → 3 cost time is: 3.00s

在我的筆記本上,這個腳本花了19.4秒去下載91張圖片。請注意這些數字在不同的網絡上也會有所不同。19.4秒並不是非常的長,但是如果我們要下載更多的圖片怎么辦呢?或許是900張而不是90張。平均下載一張圖片要0.2秒,900張的話大概需要3分鍾。那么9000張圖片將會花掉30分鍾。好消息是使用了並發或者並行后,我們可以將這個速度顯著地提高。

接下來的代碼示例將只會顯示導入特有模塊和新模塊的import語句。所有相關的Python腳本都可以在這方便地找到this GitHub repository

2、使用線程

線程是最出名的實現並發和並行的方式之一。操作系統一般提供了線程的特性。線程比進程要小,而且共享同一塊內存空間。

在這里,我們將寫一個替代“single.py”的新模塊。它將創建一個有八個線程的池,加上主線程的話總共就是九個線程。之所以是八個線程,是因為我的電腦有8個CPU內核,而一個工作線程對應一個內核看起來還不錯。在實踐中,線程的數量是仔細考究的,需要考慮到其他的因素,比如在同一台機器上跑的的其他應用和服務。

下面的腳本幾乎跟之前的一樣,除了我們現在有個新的類,DownloadWorker,一個Thread類的子類。運行無限循環的run方法已經被重寫。在每次迭代時,它調用“self.queue.get()”試圖從一個線程安全的隊列里獲取一個URL。它將會一直堵塞,直到隊列中出現一個要處理元素。一旦工作線程從隊列中得到一個元素,它將會調用之前腳本中用來下載圖片到目錄中所用到的“download_link”方法。下載完成之后,工作線程向隊列發送任務完成的信號。這非常重要,因為隊列一直在跟蹤隊列中的任務數。如果工作線程沒有發出任務完成的信號,“queue.join()”的調用將會令整個主線程都在阻塞狀態。

from queue import Queue from threading import Thread   class DownloadWorker(Thread):    def __init__(self, queue):        Thread.__init__(self)        self.queue = queue      def run(self):        while True:            # Get the work from the queue and expand the tuple            # 從隊列中獲取任務並擴展tuple            directory, link = self.queue.get()            download_link(directory, link)            self.queue.task_done()   def main():    ts = time()    client_id = os.getenv('IMGUR_CLIENT_ID')    if not client_id:        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")    download_dir = setup_download_dir()    links = [l for l in get_links(client_id) if l.endswith('.jpg')]    # Create a queue to communicate with the worker threads    queue = Queue()    # Create 8 worker threads    # 創建八個工作線程    for x in range(8):        worker = DownloadWorker(queue)        # Setting daemon to True will let the main thread exit even though the workers are blocking        # 將daemon設置為True將會使主線程退出,即使worker都阻塞了        worker.daemon = True        worker.start()    # Put the tasks into the queue as a tuple    # 將任務以tuple的形式放入隊列中    for link in links:        logger.info('Queueing {}'.format(link))        queue.put((download_dir, link))    # Causes the main thread to wait for the queue to finish processing all the tasks    # 讓主線程等待隊列完成所有的任務    queue.join()    print('Took {}'.format(time() - ts))

注:為了測試方便,上面的代碼可以用如下代碼替代演示:

# coding=utf-8 #測試utf-8編碼 from Queue import Queue from threading import Thread from single import * import sys reload(sys) sys.setdefaultencoding('utf-8') class ProcessWorker(Thread):     def __init__(self, queue):         Thread.__init__(self)         self.queue = queue     def run(self):         while True:             # Get the work from the queue             num = self.queue.get()             processNum(num)             self.queue.task_done() def main():     ts = time()     nums = getNums(4)     # Create a queue to communicate with the worker threads     queue = Queue()     # Create 4 worker threads     # 創建四個工作線程     for x in range(4):         worker = ProcessWorker(queue)         # Setting daemon to True will let the main thread exit even though the workers are blocking         # 將daemon設置為True將會使主線程退出,即使worker都阻塞了         worker.daemon = True         worker.start()     # Put the tasks into the queue     for num in nums:         queue.put(num)     # Causes the main thread to wait for the queue to finish processing all the tasks     # 讓主線程等待隊列完成所有的任務     queue.join()     print("cost time is: {:.2f}s".format(time() - ts)) if __name__ == "__main__":     main()

結果:

<ProcessWorker(Thread-4, started daemon 3900)>: 3 → 4<ProcessWorker(Thread-1, started daemon 3436)>: 2 → 3<ProcessWorker(Thread-3, started daemon 4576)>: 1 → 2   <ProcessWorker(Thread-2, started daemon 396)>: 0 → 1 cost time is: 1.01s

在同一個機器上運行這個腳本,下載時間變成了4.1秒!即比之前的例子快4.7倍。雖然這快了很多,但還是要提一下,由於GIL的緣故,在這個進程中同一時間只有一個線程在運行。因此,這段代碼是並發的但不是並行的。而它仍然變快的原因是這是一個IO密集型的任務。進程下載圖片時根本毫不費力,而主要的時間都花在了等待網絡上。這就是為什么線程可以提供很大的速度提升。每當線程中的一個准備工作時,進程可以不斷轉換線程。使用Python或其他有GIL的解釋型語言中的線程模塊實際上會降低性能。如果你的代碼執行的是CPU密集型的任務,例如解壓gzip文件,使用線程模塊將會導致執行時間變長。對於CPU密集型任務和真正的並行執行,我們可以使用多進程(multiprocessing)模塊。

官方的Python實現——CPython——帶有GIL,但不是所有的Python實現都是這樣的。比如,IronPython,使用.NET框架實現的Python就沒有GIL,基於Java實現的Jython也同樣沒有。你可以點這查看現有的Python實現。

3、生成多進程

多進程模塊比線程模塊更易使用,因為我們不需要像線程示例那樣新增一個類。我們唯一需要做的改變在主函數中。

為了使用多進程,我們得建立一個多進程池。通過它提供的map方法,我們把URL列表傳給池,然后8個新進程就會生成,它們將並行地去下載圖片。這就是真正的並行,不過這是有代價的。整個腳本的內存將會被拷貝到各個子進程中。在我們的例子中這不算什么,但是在大型程序中它很容易導致嚴重的問題。

from functools import partial from multiprocessing.pool import Pool   def main():    ts = time()    client_id = os.getenv('IMGUR_CLIENT_ID')    if not client_id:        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")    download_dir = setup_download_dir()    links = [l for l in get_links(client_id) if l.endswith('.jpg')]    download = partial(download_link, download_dir)    with Pool(8) as p:        p.map(download, links)    print('Took {}s'.format(time() - ts))

注:為了測試方便,上面的代碼可以用如下代碼替代演示:

# coding=utf-8 #測試utf-8編碼 from functools import partial from multiprocessing.pool import Pool from single import * from time import time import sys reload(sys) sys.setdefaultencoding('utf-8') def main():     ts = time()     nums = getNums(4)     p = Pool(4)     p.map(processNum, nums)     print("cost time is: {:.2f}s".format(time() - ts)) if __name__ == "__main__":     main()

結果:

<_MainThread(MainThread, started 6188)>: 0 → 1 <_MainThread(MainThread, started 3584)>: 1 → 2 <_MainThread(MainThread, started 2572)>: 3 → 4<_MainThread(MainThread, started 4692)>: 2 → 3 cost time is: 1.21s

4、分布式任務

你已經知道了線程和多進程模塊可以給你自己的電腦跑腳本時提供很大的幫助,那么在你想要在不同的機器上執行任務,或者在你需要擴大規模而超過一台機器的的能力范圍時,你該怎么辦呢?一個很好的使用案例是網絡應用的長時間后台任務。如果你有一些很耗時的任務,你不會希望在同一台機器上占用一些其他的應用代碼所需要的子進程或線程。這將會使你的應用的性能下降,影響到你的用戶們。如果能在另外一台甚至很多台其他的機器上跑這些任務就好了。

Python庫RQ非常適用於這類任務。它是一個簡單卻很強大的庫。首先將一個函數和它的參數放入隊列中。它將函數調用的表示序列化(pickle),然后將這些表示添加到一個Redis列表中。任務進入隊列只是第一步,什么都還沒有做。我們至少還需要一個能去監聽任務隊列的worker(工作線程)。

第一步是在你的電腦上安裝和使用Redis服務器,或是擁有一台能正常的使用的Redis服務器的使用權。接着,對於現有的代碼只需要一些小小的改動。先創建一個RQ隊列的實例並通過redis-py 庫傳給一台Redis服務器。然后,我們執行“q.enqueue(download_link, download_dir, link)”,而不只是調用“download_link” 。enqueue方法的第一個參數是一個函數,當任務真正執行時,其他的參數或關鍵字參數將會傳給該函數。

最后一步是啟動一些worker。RQ提供了方便的腳本,可以在默認隊列上運行起worker。只要在終端窗口中執行“rqworker”,就可以開始監聽默認隊列了。請確認你當前的工作目錄與腳本所在的是同一個。如果你想監聽別的隊列,你可以執行“rqworker queue_name”,然后將會開始執行名為queue_name的隊列。RQ的一個很好的點就是,只要你可以連接到Redis,你就可以在任意數量上的機器上跑起任意數量的worker;因此,它可以讓你的應用擴展性得到提升。下面是RQ版本的代碼:

from redis import Redis from rq import Queue   def main():    client_id = os.getenv('IMGUR_CLIENT_ID')    if not client_id:        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")    download_dir = setup_download_dir()    links = [l for l in get_links(client_id) if l.endswith('.jpg')]    q = Queue(connection=Redis(host='localhost', port=6379))    for link in links:        q.enqueue(download_link, download_dir, link)

然而RQ並不是Python任務隊列的唯一解決方案。RQ確實易用並且能在簡單的案例中起到很大的作用,但是如果有更高級的需求,我們可以使用其他的解決方案(例如 Celery)。

5、總結

如果你的代碼是IO密集型的,線程和多進程可以幫到你。多進程比線程更易用,但是消耗更多的內存。如果你的代碼是CPU密集型的,多進程就明顯是更好的選擇——特別是所使用的機器是多核或多CPU的。對於網絡應用,在你需要擴展到多台機器上執行任務,RQ是更好的選擇。

6、注:關於並發、並行區別與聯系

  • 並發是指,程序在運行的過程中存在多於一個的執行上下文。這些執行上下文一般對應着不同的調用棧。

在單處理器上,並發程序雖然有多個上下文運行環境,但某一個時刻只有一個任務在運行。

但在多處理器上,因為有了多個執行單元,就可以同時有數個任務在跑。

  • 這種物理上同一時刻有多個任務同時運行的方式就是並行。

和並發相比,並行更加強調多個任務同時在運行。

而且並行還有一個層次問題,比如是指令間的並行還是任務間的並行。

7、Refer:

[1] Python Multithreading Tutorial: Concurrency and Parallelism

http://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python

[2] 串行(Sequential)、並發(Concurrent)、並行(parallel)與分布式(distributed)

http://www.lingcc.com/2011/12/28/11918/

[3] 說說這篇「我為什么從 Python 轉向 Go」

http://t.cn/R2L0lyu

[4] Python 中的進程、線程、協程、同步、異步、回調

http://python.jobbole.com/81692/

[5] 異步等待的 Python 協程

http://segmentfault.com/a/1190000003076472

[6] Python多進程編程

http://python.jobbole.com/82045/

[7] Python線程指南

http://python.jobbole.com/82105/

[8] 使用Python進行並發編程

http://bit.ly/29hd8dq

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM