python進程池剖析(二)


  之前文章中介紹了python中multiprocessing模塊中自帶的進程池Pool,並對進程池中的數據結構和各個線程之間的合作關系進行了簡單分析,這節來看下客戶端如何對向進程池分配任務,並獲取結果的。

  我們知道,當進程池中任務隊列非空時,才會觸發worker進程去工作,那么如何向進程池中的任務隊列中添加任務呢,進程池類有兩組關鍵方法來創建任務,分別是apply/apply_async和map/map_async,實際上進程池類的apply和map方法與python內建的兩個同名方法類似,apply_async和map_async分別為它們的非阻塞版本。

  首先來看apply_async方法,源碼如下:

def apply_async(self, func, args=(), kwds={}, callback=None): assert self._state == RUN result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result
func表示執行此任務的方法
args、kwds分別表func的位置參數和關鍵字參數
callback表示一個單參數的方法,當有結果返回時,callback方法會被調用,參數即為任務執行后的結果

  每調用一次apply_result方法,實際上就向_taskqueue中添加了一條任務,注意這里采用了非阻塞(異步)的調用方式,即apply_async方法中新建的任務只是被添加到任務隊列中,還並未執行,不需要等待,直接返回創建的ApplyResult對象,注意在創建ApplyResult對象時,將它放入進程池的緩存_cache中。

  任務隊列中有了新創建的任務,那么根據上節分析的處理流程,進程池的_task_handler線程,將任務從taskqueue中獲取出來,放入_inqueue中,觸發worker進程根據args和kwds調用func,運行結束后,將結果放入_outqueue,再由進程池中的_handle_results線程,將運行結果從_outqueue中取出,並找到_cache緩存中的ApplyResult對象,_set其運行結果,等待調用端獲取。

  apply_async方法既然是異步的,那么它如何知道任務結束,並獲取結果呢?這里需要了解ApplyResult類中的兩個主要方法:

def get(self, timeout=None): self.wait(timeout) if not self._ready: raise TimeoutError if self._success: return self._value else: raise self._value def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job]

從這兩個方法名可以看出,get方法是提供給客戶端獲取worker進程運行結果的,而運行的結果是通過_handle_result線程調用_set方法,存放在ApplyResult對象中。
_set方法將運行結果保存在ApplyResult._value中,喚醒阻塞在條件變量上的get方法。客戶端通過調用get方法,返回運行結果。

  apply方法是以阻塞的方式運行獲取進程結果,它的實現很簡單,同樣是調用apply_async,只不過不返回ApplyResult,而是直接返回worker進程運行的結果:

def apply(self, func, args=(), kwds={}): assert self._state == RUN return self.apply_async(func, args, kwds).get()

  以上的apply/apply_async方法,每次只能向進程池分配一個任務,那如果想一次分配多個任務到進程池中,可以使用map/map_async方法。首先來看下map_async方法是如何定義的:

def map_async(self, func, iterable, chunksize=None, callback=None):
    assert self._state == RUN if not hasattr(iterable, '__len__'): iterable = list(iterable) if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1
        if len(iterable) == 0: chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback) self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None)) return result func表示執行此任務的方法 iterable表示任務參數序列 chunksize表示將iterable序列按每組chunksize的大小進行分割,每個分割后的序列提交給進程池中的一個任務進行處理 callback表示一個單參數的方法,當有結果返回時,callback方法會被調用,參數即為任務執行后的結果

   從源碼可以看出,map_async要比apply_async復雜,首先它會根據chunksize對任務參數序列進行分組,chunksize表示每組中的任務個數,當默認chunksize=None時,根據任務參數序列和進程池中進程數計算分組數:chunk, extra = divmod(len(iterable), len(self._pool) * 4)。假設進程池中進程數為len(self._pool)=4,任務參數序列iterable=range(123),那么chunk=7, extra=11,向下執行,得出chunksize=8,表示將任務參數序列分為8組。任務實際分組:

task_batches = Pool._get_tasks(func, iterable, chunksize)
def
_get_tasks(func, it, size): it = iter(it) while 1: x = tuple(itertools.islice(it, size)) if not x: return yield (func, x)
這里使用yield將_get_tasks方法編譯成生成器。實際上對於range(
123)這樣的序列,按照chunksize=8進行分組后,一共16組每組的元素如下: (func, (0, 1, 2, 3, 4, 5, 6, 7)) (func, (8, 9, 10, 11, 12, 13, 14, 15)) (func, (16, 17, 18, 19, 20, 21, 22, 23)) ... (func, (112, 113, 114, 115, 116, 117, 118, 119)) (func, (120, 121, 122))

  分組之后,這里定義了一個MapResult對象:result = MapResult(self._cache, chunksize, len(iterable), callback)它繼承自AppyResult類,同樣提供get和_set方法接口。將分組后的任務放入任務隊列中,然后就返回剛剛創建的result對象。

self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None)) 以任務參數序列=range(123)為例,實際上這里向任務隊列中put了一個16組元組元素的集合,元組依次為: (result._job, 0, mapstar, ((func, (0, 1,   2,   3,   4,   5,   6,   7)),), {}, None) (result._job, 1, mapstar, ((func, (8,   9,   10,  11,  12,  13,  14,  15)),), {}, None) …… (result._job, 15, mapstar, ((func, (120, 121, 122)),), {}, None)
注意每一個元組中的 i,它表示當前元組在整個任務元組集合中的位置,通過它,_handle_result線程才能將worker進程運行的結果,以正確的順序填入到MapResult對象中。

  注意這里只調用了一次put方法,將16組元組作為一個整體序列放入任務隊列,那么這個任務是否_task_handler線程是否也會像apply_async方法一樣,將整個任務序列傳遞給_inqueue,這樣就會導致進程池中的只有一個worker進程獲取到任務序列,而並非起到多進程的處理方式。我們來看下_task_handler線程是怎樣處理的:

def _handle_tasks(taskqueue, put, outqueue, pool, cache): thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): i = -1
        for i, task in enumerate(taskseq): if thread._state: debug('task handler found thread._state != RUN') break
            try: put(task) except Exception as e: job, ind = task[:2] try: cache[job]._set(ind, (False, e)) except KeyError: pass
        else: if set_length: debug('doing set_length()') set_length(i+1) continue
        break
    else: debug('task handler got sentinel')

  注意到語句 for i, task in enumerate(taskseq),原來_task_handler線程在通過taskqueue獲取到任務序列后,並不是直接放入_inqueue中的,而是將序列中任務按照之前分好的組,依次放入_inqueue中的,而循環中的task即上述的每個任務元組:(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)。接着觸發worker進程。worker進程獲取出每組任務,進行任務的處理:

job, i, func, args, kwds = task try:   
result
= (True, func(*args, **kwds)) except Exception, e:
result
= (False, e) try:
    put((job, i, result))
except Exception as e:
    wrapped = MaybeEncodingError(e, result[1])
    debug("Possible encoding error while sending result: %s" % (
        wrapped))
    put((job, i, (False, wrapped)))
根據之前放入_inqueue的順序對應關系:
(result._job, 0, mapstar, ((func, (0, 1, 2, 3, 4, 5, 6, 7)),), {}, None)
job, i, func, args, kwds = task
可以看出,元組中 mapstar 表示這里的回調函數func,((func, (0, 1, 2, 3, 4, 5, 6, 7)),)和{}分別表示args和kwds參數。
執行result = (True, func(*args, **kwds))
再來看下mapstar是如何定義的:
def mapstar(args):
return map(*args)
這里mapstar表示回調函數func,它的定義只有一個參數,而在worker進程執行回調時,使用的是func(*args, **kwds)語句,這里多一個參數能夠正確執行嗎?答案時肯定的,在調用mapstar時,如果kwds為空字典,那么傳入第二個參數不會影響函數的調用,而一個無參函數func_with_none_params,在調用時使用func_with_none_params(*(), **{})也是沒有問題的,python會自動忽視傳入的兩個空參數。
看到這里,我們明白了,實際上對任務參數分組后,每一組的任務是通過內建的map方法來進行調用的。
運行之后調用put(job, i, result)將結果放入_outqueue中,_handle_result線程會從_outqueue中將結果取出,並找到_cache緩存中的MapResult對象,_set其運行結果

  現在來我們來總結下,進程池的map_async方法是如何運行的,我們將range(123)這個任務序列,將它傳入map_async方法,假設不指定chunksize,並且cpu為四核,那么方法內部會分為16個組(0~14組每組8個元素,最后一組3個元素)。將分組后的任務放入任務隊列,一共16組,那么每個進程需要運行4次來處理,每次通過內建的map方法,順序將組中8個任務執行,再將結果放入_outqueue,找到_cache緩存中的MapResult對象,_set其運行結果,等待客戶端獲取。使用map_async方法會調用多個worker進程處理任務,每個worler進程運行結束,會將結果傳入_outqueue,再有_handle_result線程將結果寫入MapResult對象,那如何保證結果序列的順序與調用map_async時傳入的任務參數序列一致呢,我們來看看MapResult的構造函數和_set方法的實現。

def __init__(self, cache, chunksize, length, callback):
    ApplyResult.__init__(self, cache, callback)
    self._success = True
    self._value = [None] * length
    self._chunksize = chunksize
    if chunksize <= 0:
        self._number_left = 0
        self._ready = True
        del cache[self._job]
    else:
        self._number_left = length//chunksize + bool(length % chunksize)

def
_set(self, i, success_result): success, result = success_result if success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result self._number_left -= 1 if self._number_left == 0: if self._callback: self._callback(self._value) del self._cache[self._job] self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() else: self._success = False self._value = result del self._cache[self._job] self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release()

   MapResult類中,_value保存map_async的運行結果,初始化時為一個元素為None的list,list的長度與任務參數序列的長度相同,_chunksize表示將任務分組后,每組有多少個任務,_number_left表示整個任務序列被分為多少個組。_handle_result線程會通過_set方法將worker進程的運行結果保存到_value中,那么如何將worker進程運行的結果填入到_value中正確的位置呢,還記得在map_async在向task_queue填入任務時,每組中的 i嗎,i表示的就是當前任務組的組號,_set方法會根據當前任務的組號即參數 i,並且遞減_number_left,當_number_left遞減為0時,表示任務參數序列中的所有任務都已被woker進程處理,_value全部被計算出,喚醒阻塞在get方法上的條件變量,是客戶端可以獲取運行結果。

  map函數為map_async的阻塞版本,它在map_async的基礎上,調用get方法,直接阻塞到結果全部返回:

def map(self, func, iterable, chunksize=None): assert self._state == RUN return self.map_async(func, iterable, chunksize).get()

  這節我們主要分析了兩組向進程池分配任務的接口:apply/apply_async和map/map_async。apply方法每次處理一個任務,不同任務的執行方法(回調函數)、參數可以不同,而map方法每次可以處理一個任務序列,每個任務的執行方法相同。

  未完待續……

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM