如何優雅地實現Python通用多線程/進程並行模塊


當單線程性能不足時,我們通常會使用多線程/多進程去加速運行。而這些代碼往往多得令人絕望,需要考慮:

  • 如何創建線程執行的函數?
  • 如何收集結果?若希望結果從子線程返回主線程,則還要使用隊列
  • 如何取消執行? 直接kill掉所有線程?信號如何傳遞?
  • 是否需要線程池? 否則反復創建線程的成本過高了

不僅如此,若改為多進程或協程,代碼還要繼續修改。若多處使用並行,則這些代碼還會重復很多遍,非常痛苦。

於是,我們考慮將並行的所有邏輯封裝到一個模塊之內,向外部提供像串行執行一樣的編程體驗,還能徹底解決上面所述的疑難問題。所有代碼不足180行。

GitHub地址:

https://github.com/ferventdesert/multi_yielder

使用時非常簡潔:

def xprint(x): 
    time.sleep(1)  # mock a long time task 
    yield x*x   
i=0
for item in multi_yield(xrange(100)),xprint, process_mode,3:
    i+=1
    print(item)
    if i>10:
        break

上面的代碼會使用三個進程,並行地打印1-10的平方。當打印完10之后,進程自動回收釋放。就像串行程序一樣簡單。

1. 先實現串行任務

我們通常會將任務分割為很多個子塊,從而方便並行。因此可以將任務抽象為生成器。類似下面的操作,每個seed都是任務的種子。

def get_generator():
    for seed in 100:
        yield seed

任務本身的定義,則可以通過一個接受種子的函數來實現:

def worker(seed):
    # some long time task
    return seed*seed # just example

那么實現串行任務就像這樣:

for seed in get_generator(n):
    print worker(seed)

進一步地,可以將其抽象為下面的函數:

def serial_yield(genenator,worker):
    for seed in generator():
        yield worker(seed)

該函數通過傳入生成器函數(generator)和任務的定義(worker函數),即可再返回一個生成器。消費時:

for result in serial_yield(your_genenator, your_worker):
    print(result)

我們看到,通過定義高階函數,serial_yield就像map函數,對seed進行加工后輸出。

2. 定義並行任務

考慮如下場景: boss負責分發任務到任務隊列,多個worker從任務隊列撈數據,處理完之后,再寫入結果隊列。主線程從結果隊列中取結果即可。

我們定義如下幾種執行模式:

  • async: 異步/多協程
  • thread: 多線程
  • process: 多進程

使用Python創建worker的代碼如下,func是任務的定義(是個函數)

    def factory(func, args=None, name='task'):
        if args is None:
            args = ()
        if mode == process_mode:
            return multiprocessing.Process(name=name, target=func, args=args)
        if mode == thread_mode:
            import threading
            t = threading.Thread(name=name, target=func, args=args)
            t.daemon = True
            return t
        if mode == async_mode:
            import gevent
            return gevent.spawn(func, *args)

創建隊列的代碼如下,注意seeds可能是無窮流,因此需要限定隊列的長度,當入隊列發現隊列已滿時,則任務需要阻塞。

  def queue_factory(size):
        if mode == process_mode:
            return multiprocessing.Queue(size)
        elif mode == thread_mode:
            return Queue(size)
        elif mode == async_mode:
            from gevent import queue
            return queue.Queue(size)

什么時候任務可以終止? 我們羅列如下幾種情況:

  • 所有的seed都已經被消費完了
  • 外部傳入了結束請求

對第一種情況,我們讓boss在seed消費完之后,在隊列里放入多個Empty標志,worker收到Empty之后,就會自動退出,下面是boss的實現邏輯:

    def _boss(task_generator, task_queue, worker_count):
        for task in task_generator:
            task_queue.put(task)
        for i in range(worker_count):
            task_queue.put(Empty)
        print('worker boss finished')

再定義worker的邏輯:

    def _worker(task_queue, result_queue, gene_func):
        import time
        try:
            while not stop_wrapper.is_stop():
                if task_queue.empty():
                    time.sleep(0.01)
                    continue
                task = task.get()
                if task == Empty:
                    result_queue.put(Empty)
                    break
                if task == Stop:
                    break
                for item in gene_func(task):
                    result_queue.put(item)
            print ('worker worker is stop')
        except Exception as e:
            logging.exception(e)
            print ('worker exception, quit')

簡單吧?但是這樣會有問題,這個后面再說,我們把剩余的代碼寫完。

再定義multi_yield的主要代碼。 代碼非常好理解,創建任務和結果隊列,再創建boss和worker線程(或進程/協程)並啟動,之后不停地從結果隊列里取數據就可以了。

 def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10):
        workers = []
        result_queue = queue_factory(queue_size)
        task_queue = queue_factory(queue_size)

        main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss')
        for process_id in range(0, worker_count):
            name = 'worker_%s' % (process_id)
            p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name)
            workers.append(p)
        main.start()

        for r in workers:
            r.start()
        count = 0
        while not should_stop():
            data = result_queue.get()
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data

這樣從外部消費時,即可:

def xprint(x):
    time.sleep(1)
    yield x

i=0
for item in multi_yield(xprint, process_mode,3,xrange(100)):
    i+=1
    print(item)
    if i>10:
        break

這樣我們就實現了一個與serial_yield功能類似的multi_yield。可以定義多個worker,從隊列中領任務,而不需重復地創建和銷毀,更不需要線程池。當然,代碼不完全,運行時可能出問題。但以上代碼已經說明了核心的功能。完整的代碼可以在文末找到。

但是你也會發現很嚴重的問題:

  • 當從外部break時,內部的線程並不會自動停止
  • 我們無法判斷隊列的長度,若隊列滿,那么put操作會永遠卡死在那里,任務都不會結束。

3. 改進任務停止邏輯

最開始想到的,是通過在multi_yield函數參數中添加一個返回bool的函數,這樣當外部break時,同時將該函數的返回值置為True,內部檢測到該標志位后強制退出。偽代碼如下:

_stop=False
def can_stop():
    return _stop

for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop):
    i+=1
    print(item)
    if i>10:
        _stop=True
        break

但這樣並不優雅,引入了更多的函數作為參數,還必須手工控制變量值,非常繁瑣。在多進程模式下,stop標志位還如何解決?

我們希望外部在循環時執行了break后,會自動通知內部的生成器。實現方法似乎就是with語句,即contextmanager.

我們實現以下的包裝類:

class Yielder(object):
    def __init__(self, dispose):
        self.dispose = dispose

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.dispose()

它實現了with的原語,參數是dispose函數,作用是退出with代碼塊后的回收邏輯。

由於值類型的標志位無法在多進程環境中傳遞,我們再創建StopWrapper類,用於管理停止標志和回收資源:

   class Stop_Wrapper():
        def __init__(self):
            self.stop_flag = False
            self.workers=[]

        def is_stop(self):
            return self.stop_flag

        def stop(self):
            self.stop_flag = True
            for process in self.workers:
                if isinstance(process,multiprocessing.Process):
                    process.terminate()
                    

最后的問題是,如何解決隊列滿或空時,put/get的無限等待問題呢?考慮包裝一下put/get:包裝在while True之中,每隔兩秒get/put,這樣即使阻塞時,也能保證可以檢查退出標志位。所有線程在主線程結束后,最遲也能在2s內自動退出。

def safe_queue_get(queue, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            data = queue.get(timeout=timeout)
            return data
        except:
            continue


def safe_queue_put(queue, item, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            queue.put(item, timeout=timeout)
            return item
        except:
            continue
            

如何使用呢?我們只需在multi_yield的yield語句之外加上一行就可以了:

    with Yielder(stop_wrapper.stop):
        # create queue,boss,worker, then start all
        # ignore repeat code
        while not should_stop():
            data = safe_queue_get(result_queue, should_stop)
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data

仔細閱讀上面的代碼, 外部循環時退出循環,則會自動觸發stop_wrapper的stop操作,回收全部資源,而不需通過外部的標志位傳遞!這樣調用方在心智完全不需有額外的負擔。

實現生成器和上下文管理器的編程語言,都可以通過上述方式實現自動協程資源回收。筆者也實現了一個C#版本的,有興趣歡迎交流。

這樣,我們就能像文章開頭那樣,實現並行的迭代器操作了。

4. 結語

完整代碼在:

https://github.com/ferventdesert/multi_yielder/blob/master/src/multi_yielder.py

一些實現的細節很有趣,我們借助在函數中定義函數,可以不用復雜的類去承擔職責,而僅僅只需函數。而類似的思想,在函數式編程中非常常見。

該工具已經被筆者的流式語言etlpy所集成。但是依然有較多改進的空間,如沒有集成分布式執行模式。

歡迎留言交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM