Python爬蟲進階六之多進程的用法


前言

在上一節中介紹了thread多線程庫。python中的多線程其實並不是真正的多線程,並不能做到充分利用多核CPU資源。

如果想要充分利用,在python中大部分情況需要使用多進程,那么這個包就叫做 multiprocessing。

借助它,可以輕松完成從單進程到並發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

那么本節要介紹的內容有:

  • Process
  • Lock
  • Semaphore
  • Queue
  • Pipe
  • Pool

Process

基本使用

在multiprocessing中,每一個進程都用一個Process類來表示。首先看下它的API

 

 

  • target表示調用對象,你可以傳入方法的名字
  • args表示被調用對象的位置參數元組,比如target是函數a,他有兩個參數m,n,那么args就傳入(m, n)即可
  • kwargs表示調用對象的字典
  • name是別名,相當於給這個進程取一個名字
  • group分組,實際上不使用

我們先用一個實例來感受一下:

 

 

最簡單的創建Process的過程如上所示,target傳入函數名,args是函數的參數,是元組的形式,如果只有一個參數,那就是長度為1的元組。

然后調用start()方法即可啟動多個進程了。

另外你還可以通過 cpu_count() 方法還有 active_children() 方法獲取當前機器的 CPU 核心數量以及得到目前所有的運行的進程。

通過一個實例來感受一下:

 

 

運行結果:

 

 

自定義類

另外你還可以繼承Process類,自定義進程類,實現run方法即可。

用一個實例來感受一下:

 

 

在上面的例子中,我們繼承了 Process 這個類,然后實現了run方法。打印出來了進程號和參數。

運行結果:

 

 

可以看到,三個進程分別打印出了2、3、4條結果。

我們可以把一些方法獨立的寫在每個類里封裝好,等用的時候直接初始化一個類運行即可。

deamon

在這里介紹一個屬性,叫做deamon。每個線程都可以單獨設置它的屬性,如果設置為True,當父進程結束后,子進程會自動被終止。

用一個實例來感受一下,還是原來的例子,增加了deamon屬性:

 

 

在這里,調用的時候增加了設置deamon,最后的主進程(即父進程)打印輸出了一句話。

運行結果:

 

 

結果很簡單,因為主進程沒有做任何事情,直接輸出一句話結束,所以在這時也直接終止了子進程的運行。

這樣可以有效防止無控制地生成子進程。如果這樣寫了,你在關閉這個主程序運行時,就無需額外擔心子進程有沒有被關閉了。

不過這樣並不是我們想要達到的效果呀,能不能讓所有子進程都執行完了然后再結束呢?那當然是可以的,只需要加入join()方法即可。

 

 

在這里,每個子進程都調用了join()方法,這樣父進程(主進程)就會等待子進程執行完畢。

運行結果:

 

 

發現所有子進程都執行完畢之后,父進程最后打印出了結束的結果。

Lock

在上面的一些小實例中,你可能會遇到如下的運行結果:

什么問題?有的輸出錯位了。這是由於並行導致的,兩個進程同時進行了輸出,結果第一個進程的換行沒有來得及輸出,第二個進程就輸出了結果。所以導致這種排版的問題。

那這歸根結底是因為線程同時資源(輸出操作)而導致的。

那怎么來避免這種問題?那自然是在某一時間,只能一個進程輸出,其他進程等待。等剛才那個進程輸出完畢之后,另一個進程再進行輸出。這種現象就叫做“互斥”。

我們可以通過 Lock 來實現,在一個進程輸出時,加鎖,其他進程等待。等此進程執行結束后,釋放鎖,其他進程可以進行輸出。

我們現用一個實例來感受一下:

 

 

首先看一下不加鎖的輸出結果:

 

 

可以看到有些輸出已經造成了影響。

然后我們對其加鎖:

 

 

我們在print方法的前后分別添加了獲得鎖和釋放鎖的操作。這樣就能保證在同一時間只有一個print操作。

看一下運行結果:

 

 

嗯,一切都沒問題了。

所以在訪問臨界資源時,使用Lock就可以避免進程同時占用資源而導致的一些問題。

Semaphore

信號量,是在進程同步過程中一個比較重要的角色。可以控制臨界資源的數量,保證各個進程之間的互斥和同步。

如果你學過操作系統,那么一定對這方面非常了解,如果你還不了解信號量是什么,可以參考

信號量解析

來了解一下它是做什么的。

那么接下來我們就用一個實例來演示一下進程之間利用Semaphore做到同步和互斥,以及控制臨界資源數量。

 

 

如上代碼實現了注明的生產者和消費者問題,定義了兩個進程類,一個是消費者,一個是生產者。

定義了一個共享隊列,利用了Queue數據結構,然后定義了兩個信號量,一個代表緩沖區空余數,一個表示緩沖區占用數。

生產者Producer使用empty.acquire()方法來占用一個緩沖區位置,然后緩沖區空閑區大小減小1,接下來進行加鎖,對緩沖區進行操作。然后釋放鎖,然后讓代表占用的緩沖區位置數量+1,消費者則相反。

運行結果如下:

 

 

可以發現兩個進程在交替運行,生產者先放入緩沖區物品,然后消費者取出,不停地進行循環。

通過上面的例子來體會一下信號量的用法。

Queue

在上面的例子中我們使用了Queue,可以作為進程通信的共享隊列使用。

在上面的程序中,如果你把Queue換成普通的list,是完全起不到效果的。即使在一個進程中改變了這個list,在另一個進程也不能獲取到它的狀態。

因此進程間的通信,隊列需要用Queue。當然這里的隊列指的是 multiprocessing.Queue

依然是用上面那個例子,我們一個進程向隊列中放入數據,然后另一個進程取出數據。

 

 

運行結果:

 

 

可以看到生產者放入隊列中數據,然后消費者將數據取出來。

get方法有兩個參數,blocked和timeout,意思為阻塞和超時時間。默認blocked是true,即阻塞式。

當一個隊列為空的時候如果再用get取則會阻塞,所以這時候就需要吧blocked設置為false,即非阻塞式,實際上它就會調用get_nowait()方法,此時還需要設置一個超時時間,在這么長的時間內還沒有取到隊列元素,那就拋出Queue.Empty異常。

當一個隊列為滿的時候如果再用put放則會阻塞,所以這時候就需要吧blocked設置為false,即非阻塞式,實際上它就會調用put_nowait()方法,此時還需要設置一個超時時間,在這么長的時間內還沒有放進去元素,那就拋出Queue.Full異常。

另外隊列中常用的方法

Queue.qsize() 返回隊列的大小 ,不過在 Mac OS 上沒法運行。

原因:

def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
return self._maxsize – self._sem._semlock._get_value()

Queue.empty() 如果隊列為空,返回True, 反之False

Queue.full() 如果隊列滿了,返回True,反之False

Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間

Queue.get_nowait() 相當Queue.get(False)

Queue.put(item) 阻塞式寫入隊列,timeout等待時間

Queue.put_nowait(item) 相當Queue.put(item, False)

Pipe

管道,顧名思義,一端發一端收。

Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創建單向管道 (默認為雙向)。一個進程從PIPE一端輸入對象,然后被PIPE另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。

用一個實例來感受一下:

 

 

在這里聲明了一個默認為雙向的管道,然后將管道的兩端分別傳給兩個進程。兩個進程互相收發。觀察一下結果:

 

 

以上是對pipe的簡單介紹。

Pool

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

在這里需要了解阻塞和非阻塞的概念。

阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態。

阻塞即要等到回調結果出來,在有結果之前,當前進程會被掛起。

Pool的用法有阻塞和非阻塞兩種方式。非阻塞即為添加進程后,不一定非要等到改進程執行完就添加其他進程運行,阻塞則相反。

現用一個實例感受一下非阻塞的用法:

 

 

在這里利用了apply_async方法,即非阻塞。

運行結果:

 

 

可以發現在這里添加三個進程進去后,立馬就開始執行,不用非要等到某個進程結束后再添加新的進程進去。

下面再看看阻塞的用法:

 

 

在這里只需要把apply_async改成apply即可。

運行結果如下:

 

 

這樣一來就好理解了吧?

下面對函數進行解釋:

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的。

close() 關閉pool,使其不在接受新的任務。

terminate() 結束工作進程,不在處理未完成的任務。

join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。

當然每個進程可以在各自的方法返回一個結果。apply或apply_async方法可以拿到這個結果並進一步進行處理。

 

 

運行結果:

 

 

另外還有一個非常好用的map方法。

如果你現在有一堆數據要處理,每一項都需要經過一個方法來處理,那么map非常適合。

比如現在你有一個數組,包含了所有的URL,而現在已經有了一個方法用來抓取每個URL內容並解析,那么可以直接在map的第一個參數傳入方法名,第二個參數傳入URL數組。

現在我們用一個實例來感受一下:

 

 

在這里初始化一個Pool,指定進程數為3,如果不指定,那么會自動根據CPU內核來分配進程數。

然后有一個鏈接列表,map函數可以遍歷每個URL,然后對其分別執行scrape方法。

運行結果:

 

 

可以看到遍歷就這么輕松地實現了。

結語

多進程multiprocessing相比多線程功能強大太多,而且使用范圍更廣,希望本文對大家有幫助!

本文參考

https://docs.python.org/2/library/multiprocessing.html

http://www.cnblogs.com/vamei/archive/2012/10/12/2721484.html

http://www.cnblogs.com/kaituorensheng/p/4445418.html

https://my.oschina.net/yangyanxing/blog/296052

轉載:靜覓 » Python爬蟲進階六之多進程的用法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM