python中兩個常用來處理進程的模塊分別是subprocess和multiprocessing,其中subprocess通常用於執行外部程序,比如一些第三方應用程序,而不是Python程序。如果需要實現調用外部程序的功能,python的psutil模塊是更好的選擇,它不僅支持subprocess提供的功能,而且還能對當前主機或者啟動的外部程序進行監控,比如獲取網絡、cpu、內存等信息使用情況,在做一些自動化運維工作時支持的更加全面。multiprocessing是python的多進程模塊,主要通過啟動python進程,調用target回調函數來處理任務,與之對應的是python的多線程模塊threading,它們擁有類似的接口,通過定義multiprocessing.Process、threading.Thread,指定target方法,調用start()運行進程或者線程。
在python中由於全局解釋鎖(GIL)的存在,使用多線程,並不能大大提高程序的運行效率【1】。因此,用python處理並發問題時,盡量使用多進程而非多線程。並發編程中,最簡單的模式是,主進程等待任務,當有新任務到來時,啟動一個新的進程來處理當前任務。這種每個任務一個進程的處理方式,每處理一個任務都會伴隨着一個進程的創建、運行、銷毀,如果進程的運行時間越短,創建和銷毀的時間所占的比重就越大,顯然,我們應該盡量避免創建和銷毀進程本身的額外開銷,提高進程的運行效率。我們可以用進程池來減少進程的創建和開銷,提高進程對象的復用。
實際上,python中已經實現了一個功能強大的進程池(multiprocessing.Pool),這里我們來簡單剖析下python自帶的進程池是如何實現的。
要創建進程池對象,需要調用Pool函數,函數的聲明如下:
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) Returns a process pool object processes表示工作進程的個數,默認為None,表示worker進程數為cpu_count() initializer表示工作進程start時調用的初始化函數,initargs表示initializer函數的參數,如果initializer不為None,在每個工作進程start之前會調用initializer(*initargs) maxtaskperchild表示每個工作進程在退出/被其他新的進程替代前,需要完成的工作任務數,默認為None,表示工作進程存活時間與pool相同,即不會自動退出/被替換。 函數返回一個進程池(Pool)對象
Pool函數返回的進程池對象中有下面一些數據結構:
self._inqueue 接收任務隊列(SimpleQueue),用於主進程將任務發送給worker進程 self._outqueue 發送結果隊列(SimpleQueue),用於worker進程將結果發送給主進程 self._taskqueue 同步的任務隊列,保存線程池分配給主進程的任務 self._cache = {} 任務緩存 self._processes worker進程個數 self._pool = [] woker進程隊列
進程池工作時,任務的接收、分配。結果的返回,均由進程池內部的各個線程合作完成,來看看進程池內部由那些線程:
- _work_handler線程,負責保證進程池中的worker進程在有退出的情況下,創建出新的worker進程,並添加到進程隊列(pools)中,保持進程池中的worker進程數始終為processes個。_worker_handler線程回調函數為Pool._handler_workers方法,在進程池state==RUN時,循環調用_maintain_pool方法,監控是否有進程退出,並創建新的進程,append到進程池pools中,保持進程池中的worker進程數始終為processes個。
self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) )
Pool._handle_workers方法在_worker_handler線程狀態為運行時(status==RUN),循環調用_maintain_pool方法: def _maintain_pool(self): if self._join_exited_workers(): self._repopulate_pool()
_join_exited_workers()監控pools隊列中的進程是否有結束的,有則等待其結束,並從pools中刪除,當有進程結束時,調用_repopulate_pool(),創建新的進程: w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs,
self._maxtasksperchild) ) self._pool.append(w)
w是新創建的進程,它是用來處理實際任務的進程,worker是它的回調函數: def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): debug('worker got EOFError or IOError -- exiting') break if task is None: debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed) 所有worker進程都使用worker回調函數對任務進行統一的處理,從源碼中可以看出:
它的功能是從接入任務隊列中(inqueue)讀取出task任務,然后根據任務的函數、參數進行調用(result = (True, func(*args, **kwds),
再將結果放入結果隊列中(outqueue),如果有最大處理上限的限制maxtasks,那么當進程處理到任務數上限時退出。 - _task_handler線程,負責從進程池中的task_queue中,將任務取出,放入接收任務隊列(Pipe),
self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) )
Pool._handle_tasks方法不斷從task_queue中獲取任務,並放入接受任務隊列(in_queue),以此觸發worker進程進行任務處理。當從task_queue讀取到None元素時,
表示進程池將要被終止(terminate),不再處理之后的任務請求,同時向接受任務隊列和結果任務隊列put None元素,通知其他線程結束。 - _handle_results線程,負責將處理完的任務結果,從outqueue(Pipe)中讀取出來,放在任務緩存cache中,
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
) -
_terminate,這里的_terminate並不是一個線程,而是一個Finalize對象
self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority=15 )
Finalize類的構造函數與線程構造函數類似,_terminate_pool是它的回調函數,args回調函數的參數。
_terminate_pool函數負責終止進程池的工作:終止上述的三個線程,終止進程池中的worker進程,清除隊列中的數據。
_terminate是個對象而非線程,那么它如何像線程調用start()方法一樣,來執行回調函數_terminate_pool呢?查看Pool源碼,發現進程池的終止函數:
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
self._terminate()
函數中最后將_terminate對象當做一個方法來執行,而_terminate本身是一個Finalize對象,我們看一下Finalize類的定義,發現它實現了__call__方法:
def __call__(self, wr=None):
try:
del _finalizer_registry[self._key]
except KeyError:
sub_debug('finalizer no longer registered')
else:
if self._pid != os.getpid():
res = None
else:
res = self._callback(*self._args, **self._kwargs)
self._weakref = self._callback = self._args = \
self._kwargs = self._key = None
return res
而方法中 self._callback(*self._args, **self._kwargs) 這條語句,就執行了_terminate_pool函數,進而將進程池終止。
進程池中的數據結構、各個線程之間的合作關系如下圖所示:
【1】這里針對的是CPU密集型程序,多線程並不能帶來效率上的提升,相反還可能會因為線程的頻繁切換,導致效率下降;如果是IO密集型,多線程進程可以利用IO阻塞等待時的空閑時間執行其他線程,提升效率。
未完待續……