Python 線程池的原理和實現及subprocess模塊


最近由於項目需要一個與linux shell交互的多線程程序,需要用python實現,之前從沒接觸過python,這次匆匆忙忙的使用python,發現python確實語法非常簡單,功能非常強大,因為自己是從零開始使用python,連語法都是現學的,所以將一些使用記錄下來,希望能幫到大家。

使用python的需求簡單的說是調用liunux下的ffmpeg獲取音頻的一些信息,需要用多線程實現

一、subprocess

因為是多線程,首先想到的是subprocess模塊(官方文檔說明此模塊將會代替os模塊和Popen2模塊和command模塊),這是python提供的開啟子線程的標准庫。可以通過pipe將子線程的stdin、stdout和stderr與主線程交互。

subprocess.call(["ls", "-l"])

subprocess.check_call(["ls", "-l"])

 

這是兩個非常簡單的例子,主線程都會等待子線程命令的完成,然后獲取返回值,兩個方法的唯一區別就是check_call會檢查返回值,如果返回值不為0(即正確執行),則會拋出CalledProcessError異常。

 

Popen

那么我用的是Popen方法,實際上subprocess模塊中其它的方法都是對Popen的封裝,為了更方便的使用,如果我們自己需要定制某些功能,最后還是會回到Popen。

Popen具體的參數使用可以參考Python document

Popen接受元組為參數

child = subprocess.Popen(["ping","-c","5","www.google.com"])

 

與上述方法不同的是,使用Popen,主線程不會等待子線程完成,如果要等待,需要使用wait()方法。

先上我使用的代碼:

command = ["ffmpeg","-i",songPath];

stdoutData,stderrData = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate();

 

    上面也提到了,使用Popen可以自定義標准輸入、標准輸出和標准錯誤輸出。

那么在這行代碼中,我定義了stdoutData和stderrData分別接受命令行程序的標准輸出和標准錯誤輸出(即stdout=subprocess.PIPE,stderr=subprocess.PIPE),都是通過管道(Pipe)實現。大家要注意,如果我們要在接下來的主線程中使用Popen方法執行命令行程序后的輸出數據,如print(stdoutData),那么記住使用Popen.wait(),因為主線程不會等待Popen執行完成。為什么我這里沒有用呢?因為我這里使用了communicate方法,communicate方法代表子線程與主線程之間的通信,是阻塞式的,如果使用了communicate方法,主線程會等待子線程的完成

    其實從另一個層面上來說,也是linux標准輸入輸出的管道重定向,只不過是把標准輸入輸出重定向到程序而已。

    按照這種方法,我就用ffmpeg獲取到了音頻的輸出。

 

那么問題來了,說好的多線程呢?因為communicate方法是阻塞式的,並不能開啟多線程。所以在思考過后,我決定用在主線程中開啟多個子線程,分別調用subprocess模塊去獲取音頻的信息。既然都是用到了多線程,考慮到效率問題,自然聯想到了線程池。

 

二、線程池

    為什么需要線程池呢?

        設想一下,如果我們使用有任務就開啟一個子線程處理,處理完成后,銷毀子線程或等得子線程自然死亡,那么如果我們的任務所需時間比較短,但是任務數量比較多,那么更多的時間是花在線程的創建和結束上面,效率肯定就低了。

 

    線程池的原理:

        既然是線程池(Thread pool),其實名字很形象,就是把指定數量的可用子線程放進一個"池里",有任務時取出一個線程執行,任務執行完后,並不立即銷毀線程,而是放進線程池中,等待接收下一個任務。這樣內存和cpu的開銷也比較小,並且我們可以控制線程的數量。

 

    線程池的實現:

        線程池有很多種實現方式,在python中,已經給我們提供了一個很好的實現方式:Queue-隊列。因為python中Queue本身就是同步的,所以也就是線程安全的,所以我們可以放心的讓多個線程共享一個Queue。

        那么說到線程池,那么理應也得有一個任務池,任務池中存放着待執行的任務,各個線程到任務池中取任務執行,那么用Queue來實現任務池是最好不過的。

 

先上代碼:

class TaskManager():

    def __init__(self,maxTasks,maxThreads):
        #最大任務書,也就是Queue的容量
        self._maxTasks = maxTasks;
        #線程池中線程數量    
        self._maxThreads = maxThreads;
        #業務代碼
        ….
        ….

        #任務池
        self._taskQueue = Queue.Queue(maxTasks);
        #線程池,使用列表實現
        self._threads = [];

        #在__init__中調用方法
        self.initThreads();
        self.initTaskQueue();

    #初始化任務池
    def initTaskQueue(self):
        while True:
        #業務代碼
            if not self._taskQueue.full():
                getTasks(self._maxTasks - self._taskQueue.qsize());
                for task in taskMap["tasks"]:
                self._taskQueue.put(task);
                time.sleep(1);

    #初始化線程池
    def initThreads(self):
        for i in range(self._maxThreads):
        #調用每個線程執行的具體任務
        self._threads.append(Work(self,self._reportUrl));

    def getTask(self):
        return self._taskQueue.get();

#具體執行的任務
class Work(threading.Thread):
    def __init__(self,taskmgr):
    threading.Thread.__init__(self);
    self._logger = logging.getLogger("");
    self.start();

    def run(self):
        while True:
            try:
                #取出任務並執行相關操作
                self._taskmgr.getTask();
                ……
                ……

                time.sleep(1);
            except Exception,e:
                self._logger.exception(e);            

 

 

線程池的實現主要分兩部分,一部分是TaskMagager,即任務管理類,用來調度任務,一部分是Work,即具體需要執行的業務代碼。線程池的這種設計模式在很多地方都可以借鑒

 

TaskManager

    先來看TaskManager,主要包含四個方法,一個構造方法,接受傳進來的參數,執行任務池和線程池的大小等初始化信息,然后調用initTaskQueue和initThread方法初始化任務池和線程池。

    最后一個方法getTask返回TaskManager類的一個實例。

 

Work

    執行具體的業務

 

過程分析

  1. TaskManager的__init__方法初始化線程池和任務池
  2. initTaskQueue方法,初始化任務池,將任務填充到任務隊列。
  3. initThreads方法,初始化線程池,調用Work類執行任務。
  4. getTask方法,返回TaskManager實例,主要作用是傳給Work類,讓子線程從任務隊列中取出任務執行。
  5. Work類的__init__方法初始化線程,並啟動線程。
  6. Run方法,執行任務,並且從任務隊列中取出任務。

 

關鍵點:

  1. 在主線程,也即TaskManager的initTaskQueue方法中獲取任務並填充任務池
  2. 在各個子線程中,也即Work類的run方法中獲取任務池中的任務並執行。

    這里需要注意的是,前面提到過,Python中的Queue是線程安全的,Queue的get方法是阻塞式,也即,如果Queue為空,子線程取不到任務,會進行等待,直到Queue中有任務可取

    三、在TaskManager的__init__方法中,最好先啟動線程,在啟動任務池

self.initThreads();

self.initTaskQueue();

     

否則在initTaskQueue(主線程)中的while循環會一直執行,將會阻塞線程池的執行。在第二點中說明過,先啟動線程池,就算任務池沒有任務,子線程也會阻塞等待任務池中出現新任務。

 

就寫到這里,如果錯誤,請大家指正:)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM