Easy Pipeline,一種輕量級的Python Pipeline庫


嗯,很久沒有寫博客了,最近的工作都是偏開發性質的,以至於沒有時間對自己感興趣的領域進行探索,感覺個人的成長停滯了一些。如何在枯燥的工作中,提取出有助於自己成長的養分,對於每個人來說都是不小的考驗。

這次,帶來的是之前編寫的一下挺簡單的庫,用來簡化流水線作業的小框架。

Github: https://github.com/miaoerduo/easy-pipeline 歡迎Star和提交MR。

起因是這樣的,組內有一個需求,需要挖掘視頻中的檢測難樣本,這樣可以極大地減少標注的量,從而降低成本。難樣本挖掘的策略,簡單來說就是如果視頻的前幾幀和后幾幀都能檢測到目標,而就只有當前幀沒有檢測到,就說明當前幀很可能存在漏檢(沒有檢測本到該檢測到的目標);反之,如果前后都沒有檢測到目標,而當前幀檢測到了,那就很可能是誤檢(檢測到不是目標的東西)。

初步的方案是這樣的,我們先把視頻抽幀,直接用FFMpeg就可以方便的完成。然后調用現在的檢測器,進行逐幀的檢測,把檢測結果存下來。最后寫個腳本,分析檢測的結果,然后輸出可能有問題的幀,然后這些幀就會進行送標(發給標注員進行標注)。最終我們就只需要標注一些比較hard的樣本就行了。

但是這樣會帶來很多的問題,最顯著的兩個:1. 需要保存大量的中間結果(圖片幀);2. 必須依次完成每一步之后,才能得到最終的結果。

這時候,相比大家都知道了該如何去解決。對的,我們應該用流水線作業的方式去進行。

首先我們可以將每部分任務並行的去處理。抽幀之后的結果送入隊列;之后檢測模塊從隊列取幀,檢測之后將結果送入下一個隊列;最后一個隊列得到檢測結果,再做最終的分析。相比於之前的方式,這樣可以盡量的減少中間的結果。

實現該方案,只需要使用最簡單的生產者消費者隊列即可以完成。所以說,相信大家都十分了解了。對於上面的邏輯,我們需要的隊列的數目和我們的模塊數是正相關的。如果單純的進行實現的話,實在的太麻煩了,給隊列命名都要我們絞盡腦汁了。所以,為了更優雅的編寫代碼,這里就推出本文標題中的Easy Pipeline框架。

首先,我們舉個最簡單的例子來說明該框架的工作模式。輸入一個數字的序列,按要求對他們進行加減乘除的操作(這里的每個操作,其實可以等價於前面的抽幀或是檢測的更復雜的操作 ),並且支持每個操作的進程數。

from easy_pipeline import SimplePipeline, PipelineItem, Task, StopTask, EmptyTask
import multiprocessing as mp

# define our Task
class NumTask(Task):
    def __init__(self, x):
        super(NumTask, self).__init__()
        self.val = x

# init function, here we use closure to get different function
def get_init_fn(x):
    def init():
        return x
    return init

# operations
def plus(res, task):
    return NumTask(task.val + res)

def mul(res, task):
    return NumTask(task.val * res)

def minus(res, task):
    return NumTask(task.val - res)

def div(res, task):
    return NumTask(task.val / res)

if __name__ == '__main__':

    # job queue
    manager = mp.Manager()
    job_queue = manager.Queue(1000)

    # define pipeline and start

    # x = ((x + 1) * 2 - 3)/ 5
    pipeline_items = [
        PipelineItem(plus, get_init_fn(1), 1, 10),      # plus 1
        PipelineItem(mul, get_init_fn(2), 2, 10),       # mul 2
        PipelineItem(minus, get_init_fn(3), 3, 10),     # minus 3
        PipelineItem(div, get_init_fn(5.), 4, 10),      # div 5
    ]

    pipeline = SimplePipeline(pipeline_items, job_queue)
    pipeline.start()
    result_queue = pipeline.get_result_queue()

    # Feed jobs anytime (before StopTask)
    for i in range(10):
        job_queue.put(NumTask(i))

    # get partial output
    print('Get Output Start')
    for i in range(5):
        result = result_queue.get()
        if isinstance(result, StopTask):
            print("get stop task")
            break
        if isinstance(result, EmptyTask):
            continue
        print(result.val)
    print('Get Output End')
    
    # Feed jobs anytime (before StopTask)
    for i in range(10, 20):
        job_queue.put(NumTask(i))

    # Stop pipeline, means no more job will be added then.
    # Every process will exit when it has done all current jobs in job_queue
    pipeline.stop()

    # get all output
    print('Get Output Start')
    while True:
        result = result_queue.get()
        if isinstance(result, StopTask):
            print("Output Queue Empty")
            break
        if isinstance(result, EmptyTask):
            continue
        print(result.val)
    print('Get Output End')

下面,我們來簡單的說明一下工作邏輯。

  1. 首先,我們需要定義自己的任務Task。只需要繼承Task這個類即可,內部可以存放自己喜歡的任何數據。這里只是為了計算,所以就只存放了一個數字。
  2. 定義我們的初始化函數和工作函數。初始化函數的作用是給每個進程初始化一些資源,如果不需要也可以不要。這里的初始化函數就是返回了一個值,表示操作數。工作函數是最重要的函數,他會處理接收到的Task,處理並返回新的Task(新的Task可以理解為處理的結果)。工作函數有兩個輸入,一個是資源,即初始化函數的返回值,另一個就是Task本身。
  3. 構建Pipeline。每個工作模塊都只需要用PipelineItem這個對象進行封裝即可。器參數分別是:工作函數、初始化函數、進程數、結果隊列的長度(-1表示不限長度)。結果隊列的長度,通常設置為較大的值即可。因為不能的模塊的處理速度可能不同,因此很容易出現結果堆積的現象,如果不支持隊列長度,會導致內存的大量的占用。最后將PipelineItem的數組和輸入的對壘傳給SimplePipeline對象即可構建完我們的整套Pipeline程序了!
  4. 啟動Pipeline程序,並輸入數據。
  5. 得到結果!完事了,優秀。

上面這是一個最簡單的例子,可以比較直觀的感受到這個框架的便捷之處。完全屏蔽掉對隊列,並發等的操作。

在我推薦給同事之后,確實一定程度地減小他的工作量,但同時,他也向我反饋了一些問題:這個框架在某些地方有些比較靈活的設計,應該給出足夠多的實例,才能方便實用。關於該框架的設計思路和實例,將會在下一篇博客中進行詳細介紹。

最后,歡迎大家Star和提交MR。願與你們一同進步。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM