嗯,很久沒有寫博客了,最近的工作都是偏開發性質的,以至於沒有時間對自己感興趣的領域進行探索,感覺個人的成長停滯了一些。如何在枯燥的工作中,提取出有助於自己成長的養分,對於每個人來說都是不小的考驗。
這次,帶來的是之前編寫的一下挺簡單的庫,用來簡化流水線作業的小框架。
Github: https://github.com/miaoerduo/easy-pipeline 歡迎Star和提交MR。
起因是這樣的,組內有一個需求,需要挖掘視頻中的檢測難樣本,這樣可以極大地減少標注的量,從而降低成本。難樣本挖掘的策略,簡單來說就是如果視頻的前幾幀和后幾幀都能檢測到目標,而就只有當前幀沒有檢測到,就說明當前幀很可能存在漏檢(沒有檢測本到該檢測到的目標);反之,如果前后都沒有檢測到目標,而當前幀檢測到了,那就很可能是誤檢(檢測到不是目標的東西)。
初步的方案是這樣的,我們先把視頻抽幀,直接用FFMpeg就可以方便的完成。然后調用現在的檢測器,進行逐幀的檢測,把檢測結果存下來。最后寫個腳本,分析檢測的結果,然后輸出可能有問題的幀,然后這些幀就會進行送標(發給標注員進行標注)。最終我們就只需要標注一些比較hard的樣本就行了。
但是這樣會帶來很多的問題,最顯著的兩個:1. 需要保存大量的中間結果(圖片幀);2. 必須依次完成每一步之后,才能得到最終的結果。
這時候,相比大家都知道了該如何去解決。對的,我們應該用流水線作業的方式去進行。
首先我們可以將每部分任務並行的去處理。抽幀之后的結果送入隊列;之后檢測模塊從隊列取幀,檢測之后將結果送入下一個隊列;最后一個隊列得到檢測結果,再做最終的分析。相比於之前的方式,這樣可以盡量的減少中間的結果。
實現該方案,只需要使用最簡單的生產者消費者隊列即可以完成。所以說,相信大家都十分了解了。對於上面的邏輯,我們需要的隊列的數目和我們的模塊數是正相關的。如果單純的進行實現的話,實在的太麻煩了,給隊列命名都要我們絞盡腦汁了。所以,為了更優雅的編寫代碼,這里就推出本文標題中的Easy Pipeline框架。
首先,我們舉個最簡單的例子來說明該框架的工作模式。輸入一個數字的序列,按要求對他們進行加減乘除的操作(這里的每個操作,其實可以等價於前面的抽幀或是檢測的更復雜的操作 ),並且支持每個操作的進程數。
from easy_pipeline import SimplePipeline, PipelineItem, Task, StopTask, EmptyTask
import multiprocessing as mp
# define our Task
class NumTask(Task):
def __init__(self, x):
super(NumTask, self).__init__()
self.val = x
# init function, here we use closure to get different function
def get_init_fn(x):
def init():
return x
return init
# operations
def plus(res, task):
return NumTask(task.val + res)
def mul(res, task):
return NumTask(task.val * res)
def minus(res, task):
return NumTask(task.val - res)
def div(res, task):
return NumTask(task.val / res)
if __name__ == '__main__':
# job queue
manager = mp.Manager()
job_queue = manager.Queue(1000)
# define pipeline and start
# x = ((x + 1) * 2 - 3)/ 5
pipeline_items = [
PipelineItem(plus, get_init_fn(1), 1, 10), # plus 1
PipelineItem(mul, get_init_fn(2), 2, 10), # mul 2
PipelineItem(minus, get_init_fn(3), 3, 10), # minus 3
PipelineItem(div, get_init_fn(5.), 4, 10), # div 5
]
pipeline = SimplePipeline(pipeline_items, job_queue)
pipeline.start()
result_queue = pipeline.get_result_queue()
# Feed jobs anytime (before StopTask)
for i in range(10):
job_queue.put(NumTask(i))
# get partial output
print('Get Output Start')
for i in range(5):
result = result_queue.get()
if isinstance(result, StopTask):
print("get stop task")
break
if isinstance(result, EmptyTask):
continue
print(result.val)
print('Get Output End')
# Feed jobs anytime (before StopTask)
for i in range(10, 20):
job_queue.put(NumTask(i))
# Stop pipeline, means no more job will be added then.
# Every process will exit when it has done all current jobs in job_queue
pipeline.stop()
# get all output
print('Get Output Start')
while True:
result = result_queue.get()
if isinstance(result, StopTask):
print("Output Queue Empty")
break
if isinstance(result, EmptyTask):
continue
print(result.val)
print('Get Output End')
下面,我們來簡單的說明一下工作邏輯。
- 首先,我們需要定義自己的任務Task。只需要繼承Task這個類即可,內部可以存放自己喜歡的任何數據。這里只是為了計算,所以就只存放了一個數字。
- 定義我們的初始化函數和工作函數。初始化函數的作用是給每個進程初始化一些資源,如果不需要也可以不要。這里的初始化函數就是返回了一個值,表示操作數。工作函數是最重要的函數,他會處理接收到的Task,處理並返回新的Task(新的Task可以理解為處理的結果)。工作函數有兩個輸入,一個是資源,即初始化函數的返回值,另一個就是Task本身。
- 構建Pipeline。每個工作模塊都只需要用PipelineItem這個對象進行封裝即可。器參數分別是:工作函數、初始化函數、進程數、結果隊列的長度(-1表示不限長度)。結果隊列的長度,通常設置為較大的值即可。因為不能的模塊的處理速度可能不同,因此很容易出現結果堆積的現象,如果不支持隊列長度,會導致內存的大量的占用。最后將PipelineItem的數組和輸入的對壘傳給SimplePipeline對象即可構建完我們的整套Pipeline程序了!
- 啟動Pipeline程序,並輸入數據。
- 得到結果!完事了,優秀。
上面這是一個最簡單的例子,可以比較直觀的感受到這個框架的便捷之處。完全屏蔽掉對隊列,並發等的操作。
在我推薦給同事之后,確實一定程度地減小他的工作量,但同時,他也向我反饋了一些問題:這個框架在某些地方有些比較靈活的設計,應該給出足夠多的實例,才能方便實用。關於該框架的設計思路和實例,將會在下一篇博客中進行詳細介紹。
最后,歡迎大家Star和提交MR。願與你們一同進步。