當單線程性能不足時,我們通常會使用多線程/多進程去加速運行。而這些代碼往往多得令人絕望,需要考慮:
- 如何創建線程執行的函數?
- 如何收集結果?若希望結果從子線程返回主線程,則還要使用隊列
- 如何取消執行? 直接kill掉所有線程?信號如何傳遞?
- 是否需要線程池? 否則反復創建線程的成本過高了
不僅如此,若改為多進程或協程,代碼還要繼續修改。若多處使用並行,則這些代碼還會重復很多遍,非常痛苦。
於是,我們考慮將並行的所有邏輯封裝到一個模塊之內,向外部提供像串行執行一樣的編程體驗,還能徹底解決上面所述的疑難問題。所有代碼不足180行。
GitHub地址:
使用時非常簡潔:
def xprint(x):
time.sleep(1) # mock a long time task
yield x*x
i=0
for item in multi_yield(xrange(100)),xprint, process_mode,3:
i+=1
print(item)
if i>10:
break
上面的代碼會使用三個進程,並行地打印1-10的平方。當打印完10之后,進程自動回收釋放。就像串行程序一樣簡單。
1. 先實現串行任務
我們通常會將任務分割為很多個子塊,從而方便並行。因此可以將任務抽象為生成器。類似下面的操作,每個seed都是任務的種子。
def get_generator():
for seed in 100:
yield seed
任務本身的定義,則可以通過一個接受種子的函數來實現:
def worker(seed):
# some long time task
return seed*seed # just example
那么實現串行任務就像這樣:
for seed in get_generator(n):
print worker(seed)
進一步地,可以將其抽象為下面的函數:
def serial_yield(genenator,worker):
for seed in generator():
yield worker(seed)
該函數通過傳入生成器函數(generator)和任務的定義(worker函數),即可再返回一個生成器。消費時:
for result in serial_yield(your_genenator, your_worker):
print(result)
我們看到,通過定義高階函數,serial_yield就像map函數,對seed進行加工后輸出。
2. 定義並行任務
考慮如下場景: boss負責分發任務到任務隊列,多個worker從任務隊列撈數據,處理完之后,再寫入結果隊列。主線程從結果隊列中取結果即可。
我們定義如下幾種執行模式:
- async: 異步/多協程
- thread: 多線程
- process: 多進程
使用Python創建worker的代碼如下,func是任務的定義(是個函數)
def factory(func, args=None, name='task'):
if args is None:
args = ()
if mode == process_mode:
return multiprocessing.Process(name=name, target=func, args=args)
if mode == thread_mode:
import threading
t = threading.Thread(name=name, target=func, args=args)
t.daemon = True
return t
if mode == async_mode:
import gevent
return gevent.spawn(func, *args)
創建隊列的代碼如下,注意seeds可能是無窮流,因此需要限定隊列的長度,當入隊列發現隊列已滿時,則任務需要阻塞。
def queue_factory(size):
if mode == process_mode:
return multiprocessing.Queue(size)
elif mode == thread_mode:
return Queue(size)
elif mode == async_mode:
from gevent import queue
return queue.Queue(size)
什么時候任務可以終止? 我們羅列如下幾種情況:
- 所有的seed都已經被消費完了
- 外部傳入了結束請求
對第一種情況,我們讓boss在seed消費完之后,在隊列里放入多個Empty標志,worker收到Empty之后,就會自動退出,下面是boss的實現邏輯:
def _boss(task_generator, task_queue, worker_count):
for task in task_generator:
task_queue.put(task)
for i in range(worker_count):
task_queue.put(Empty)
print('worker boss finished')
再定義worker的邏輯:
def _worker(task_queue, result_queue, gene_func):
import time
try:
while not stop_wrapper.is_stop():
if task_queue.empty():
time.sleep(0.01)
continue
task = task.get()
if task == Empty:
result_queue.put(Empty)
break
if task == Stop:
break
for item in gene_func(task):
result_queue.put(item)
print ('worker worker is stop')
except Exception as e:
logging.exception(e)
print ('worker exception, quit')
簡單吧?但是這樣會有問題,這個后面再說,我們把剩余的代碼寫完。
再定義multi_yield的主要代碼。 代碼非常好理解,創建任務和結果隊列,再創建boss和worker線程(或進程/協程)並啟動,之后不停地從結果隊列里取數據就可以了。
def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10):
workers = []
result_queue = queue_factory(queue_size)
task_queue = queue_factory(queue_size)
main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss')
for process_id in range(0, worker_count):
name = 'worker_%s' % (process_id)
p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name)
workers.append(p)
main.start()
for r in workers:
r.start()
count = 0
while not should_stop():
data = result_queue.get()
if data is Empty:
count += 1
if count == worker_count:
break
continue
if data is Stop:
break
else:
yield data
這樣從外部消費時,即可:
def xprint(x):
time.sleep(1)
yield x
i=0
for item in multi_yield(xprint, process_mode,3,xrange(100)):
i+=1
print(item)
if i>10:
break
這樣我們就實現了一個與serial_yield
功能類似的multi_yield
。可以定義多個worker,從隊列中領任務,而不需重復地創建和銷毀,更不需要線程池。當然,代碼不完全,運行時可能出問題。但以上代碼已經說明了核心的功能。完整的代碼可以在文末找到。
但是你也會發現很嚴重的問題:
- 當從外部break時,內部的線程並不會自動停止
- 我們無法判斷隊列的長度,若隊列滿,那么put操作會永遠卡死在那里,任務都不會結束。
3. 改進任務停止邏輯
最開始想到的,是通過在multi_yield
函數參數中添加一個返回bool的函數,這樣當外部break時,同時將該函數的返回值置為True,內部檢測到該標志位后強制退出。偽代碼如下:
_stop=False
def can_stop():
return _stop
for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop):
i+=1
print(item)
if i>10:
_stop=True
break
但這樣並不優雅,引入了更多的函數作為參數,還必須手工控制變量值,非常繁瑣。在多進程模式下,stop標志位還如何解決?
我們希望外部在循環時執行了break后,會自動通知內部的生成器。實現方法似乎就是with語句,即contextmanager.
我們實現以下的包裝類:
class Yielder(object):
def __init__(self, dispose):
self.dispose = dispose
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self.dispose()
它實現了with的原語,參數是dispose函數,作用是退出with代碼塊后的回收邏輯。
由於值類型的標志位無法在多進程環境中傳遞,我們再創建StopWrapper類,用於管理停止標志和回收資源:
class Stop_Wrapper():
def __init__(self):
self.stop_flag = False
self.workers=[]
def is_stop(self):
return self.stop_flag
def stop(self):
self.stop_flag = True
for process in self.workers:
if isinstance(process,multiprocessing.Process):
process.terminate()
最后的問題是,如何解決隊列滿或空時,put/get的無限等待問題呢?考慮包裝一下put/get:包裝在while True
之中,每隔兩秒get/put,這樣即使阻塞時,也能保證可以檢查退出標志位。所有線程在主線程結束后,最遲也能在2s內自動退出。
def safe_queue_get(queue, is_stop_func=None, timeout=2):
while True:
if is_stop_func is not None and is_stop_func():
return Stop
try:
data = queue.get(timeout=timeout)
return data
except:
continue
def safe_queue_put(queue, item, is_stop_func=None, timeout=2):
while True:
if is_stop_func is not None and is_stop_func():
return Stop
try:
queue.put(item, timeout=timeout)
return item
except:
continue
如何使用呢?我們只需在multi_yield的yield語句之外加上一行就可以了:
with Yielder(stop_wrapper.stop):
# create queue,boss,worker, then start all
# ignore repeat code
while not should_stop():
data = safe_queue_get(result_queue, should_stop)
if data is Empty:
count += 1
if count == worker_count:
break
continue
if data is Stop:
break
else:
yield data
仔細閱讀上面的代碼, 外部循環時退出循環,則會自動觸發stop_wrapper的stop操作,回收全部資源,而不需通過外部的標志位傳遞!這樣調用方在心智完全不需有額外的負擔。
實現生成器和上下文管理器的編程語言,都可以通過上述方式實現自動協程資源回收。筆者也實現了一個C#版本的,有興趣歡迎交流。
這樣,我們就能像文章開頭那樣,實現並行的迭代器操作了。
4. 結語
完整代碼在:
https://github.com/ferventdesert/multi_yielder/blob/master/src/multi_yielder.py
一些實現的細節很有趣,我們借助在函數中定義函數,可以不用復雜的類去承擔職責,而僅僅只需函數。而類似的思想,在函數式編程中非常常見。
該工具已經被筆者的流式語言etlpy
所集成。但是依然有較多改進的空間,如沒有集成分布式執行模式。
歡迎留言交流。