python進程池剖析(三)


  之前文章對python中進程池的原理、數據流以及應用從代碼角度做了簡單的剖析,現在讓我們回頭看看標准庫中對進程池的實現都有哪些值得我們學習的地方。我們知道,進程池內部由多個線程互相協作,向客戶端提供可靠的服務,那么這些線程之間是怎樣做到數據共享與同步的呢?在客戶端使用apply/map函數向進程池分配任務時,使用self._taskqueue來存放任務元素,_taskqueue定義為Queue.Queue(),這是一個python標准庫中的線程安全的同步隊列,它保證通知時刻只有一個線程向隊列添加或從隊列獲取元素。這樣,主線程向進程池中分配任務(taskqueue.put),進程池中_handle_tasks線程讀取_taskqueue隊列中的元素,兩個線程同時操作taskqueue,互不影響。進程池中有N個worker進程在等待任務下發,那么進程池中的_handle_tasks線程讀取出任務后,又如何保證一個任務不被多個worker進程獲取到呢?我們來看下_handle_tasks線程將任務讀取出來之后如何交給worker進程的:

for taskseq, set_length in iter(taskqueue.get, None): i = -1
    for i, task in enumerate(taskseq): if thread._state: debug('task handler found thread._state != RUN') break
        try: put(task) except Exception as e: job, ind = task[:2] try: cache[job]._set(ind, (False, e)) except KeyError: pass else: if set_length: debug('doing set_length()') set_length(i+1) continue
    break
else: debug('task handler got sentinel') 在從taskqueue中get到任務之后,對任務中的每個task,調用了put函數,這個put函數實際上是將task放入了管道,而主進程與worker進程的交互,正是通過管道來完成的。 再來看看worker進程的定義: w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild) ) 其中self._inqueue和self._outqueue為SimpleQueue()對象,實際是帶鎖的管道,上述_handle_task線程調用的put函數,即為SimpleQueue對象的方法。我們看到,這里worker進程定義均相同,所以進程池中的worker進程共享self._inqueue和self._outqueue對象,那么當一個task元素被put到共享的_inqueue管道中時,如何確保只有一個worker獲取到呢,答案同樣是加鎖,在SimpleQueue()類的定義中,put以及get方法都帶有鎖,進行同步,唯一不同的是,這里的鎖是用於進程間同步的。這樣就保證了多個worker之間能夠確保任務的同步。與分配任務類似,在worker進程運行完之后,會將結果put會_outqueue,_outqueue同樣是SimpleQueue類對象,可以在多個進程之間進行互斥。

  在worker進程運行結束之后,會將執行結果通過管道傳回,進程池中有_handle_result線程來負責接收result,取出之后,通過調用_set方法將結果寫回ApplyResult/MapResult對象,客戶端可以通過get方法取出結果,這里通過使用條件變量進行同步,當_set函數執行之后,通過條件變量喚醒阻塞在get函數的主進程。

  進程池終止工作通過調用Pool.terminate()來實現,這里的實現很巧妙,用了一個可調用對象,將終止Pool時的需要執行的回調函數先注冊好,等到需要終止時,直接調用對象即可。

self._terminate = Finalize(     self, self._terminate_pool,     args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache),     exitpriority=15 ) 在Finalize類的實現了__call__方法,在運行self._terminate()時,就會調用構造self._terminate時傳入的self._terminate_pool對象。

  使用map/map_async函數向進程池中批量分配任務時,使用了生成器表達式:

self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None))
生成器表達式很簡單,只需把列表解析的的[]換成()即可,上述表達的列表解析表示為:
[(result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)]
這里使用生成器表達式的好處是,它相當於列表解析的擴展,是對內存有好的,因為它只是生成了一個生成器,當我們需要使用該生成器對應的邏輯目標數據時,它才會通過既定邏輯去生成該數據,所以不會大量占用內存。

  在Pool中,_worker_handler線程負責監控、創建新的工作進程,在監控工作進程退出時,同時將退出的進程從進程池中刪除掉。這類似於,一邊遍歷一邊刪除列表。我們來看下下面代碼的實現:

>>> l = [1, 2, 3, 3, 4, 4, 4, 5] >>> for i in l: if i in [3, 4, 5]: l.remove(i) >>> l [1, 2, 3, 4, 5]

我們看到l沒有將所有的3和4都刪除掉,這是因為remove改變了l的大小。再看下面的實現:

>>> l = [1, 2, 3, 3, 4, 4, 4, 5] >>> for i in range(len(l)): if l[i] in [3, 4]: del l[i] Traceback (most recent call last): File "<pyshell#37>", line 2, in <module>
    if l[i] in [3, 4]: IndexError: list index out of range >>> 

同樣因為del l[i]時,l的大小改變,繼續訪問下去導致訪問越界。而標准庫中的進程池給出了遍歷刪除的一個正確示例:

for i in reversed(range(len(self._pool))): worker = self._pool[i] if worker.exitcode is not None: worker.join() cleaned = True del self._pool[i]

使用reversed,從后向前刪除list中的元素,這樣會保證所有符合刪除條件的元素被刪除掉:

>>> l = [1, 2, 3, 3, 4, 4, 4, 5] >>> for i in reversed(range(len(l))): if l[i] in [3, 4, 5]: del l[i] >>> l [1, 2]

  可以看出,一個篇幅並不算大的Pool模塊,就有很多值得學習的地方。對於python亦或者其他語言,技能的提升,多閱讀標准庫中代碼,是一個很不錯的選擇。對於我們經常使用,而不知其中實現奧秘的源碼,多閱讀源碼,了解其技術實現,就像侯捷那本《STL源碼剖析》中講到的,源碼之前,了無秘密。更重要的是,將這些漂亮而又高效的編碼方式,運用在自己的工作中,讓自己的代碼也可以像標准庫中的代碼一樣優雅,這可以說是每一個開發人員的追求。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM