最近由於項目需要一個與linux shell交互的多線程程序,需要用python實現,之前從沒接觸過python,這次匆匆忙忙的使用python,發現python確實語法非常簡單,功能非常強大,因為自己是從零開始使用python,連語法都是現學的,所以將一些使用記錄下來,希望能幫到大家。
使用python的需求簡單的說是調用liunux下的ffmpeg獲取音頻的一些信息,需要用多線程實現
一、subprocess
因為是多線程,首先想到的是subprocess模塊(官方文檔說明此模塊將會代替os模塊和Popen2模塊和command模塊),這是python提供的開啟子線程的標准庫。可以通過pipe將子線程的stdin、stdout和stderr與主線程交互。
subprocess.call(["ls", "-l"]) subprocess.check_call(["ls", "-l"])
這是兩個非常簡單的例子,主線程都會等待子線程命令的完成,然后獲取返回值,兩個方法的唯一區別就是check_call會檢查返回值,如果返回值不為0(即正確執行),則會拋出CalledProcessError異常。
Popen
那么我用的是Popen方法,實際上subprocess模塊中其它的方法都是對Popen的封裝,為了更方便的使用,如果我們自己需要定制某些功能,最后還是會回到Popen。
Popen具體的參數使用可以參考Python document
Popen接受元組為參數
child = subprocess.Popen(["ping","-c","5","www.google.com"])
與上述方法不同的是,使用Popen,主線程不會等待子線程完成,如果要等待,需要使用wait()方法。
先上我使用的代碼:
command = ["ffmpeg","-i",songPath]; stdoutData,stderrData = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate();
上面也提到了,使用Popen可以自定義標准輸入、標准輸出和標准錯誤輸出。
那么在這行代碼中,我定義了stdoutData和stderrData分別接受命令行程序的標准輸出和標准錯誤輸出(即stdout=subprocess.PIPE,stderr=subprocess.PIPE),都是通過管道(Pipe)實現。大家要注意,如果我們要在接下來的主線程中使用Popen方法執行命令行程序后的輸出數據,如print(stdoutData),那么記住使用Popen.wait(),因為主線程不會等待Popen執行完成。為什么我這里沒有用呢?因為我這里使用了communicate方法,communicate方法代表子線程與主線程之間的通信,是阻塞式的,如果使用了communicate方法,主線程會等待子線程的完成。
其實從另一個層面上來說,也是linux標准輸入輸出的管道重定向,只不過是把標准輸入輸出重定向到程序而已。
按照這種方法,我就用ffmpeg獲取到了音頻的輸出。
那么問題來了,說好的多線程呢?因為communicate方法是阻塞式的,並不能開啟多線程。所以在思考過后,我決定用在主線程中開啟多個子線程,分別調用subprocess模塊去獲取音頻的信息。既然都是用到了多線程,考慮到效率問題,自然聯想到了線程池。
二、線程池
為什么需要線程池呢?
設想一下,如果我們使用有任務就開啟一個子線程處理,處理完成后,銷毀子線程或等得子線程自然死亡,那么如果我們的任務所需時間比較短,但是任務數量比較多,那么更多的時間是花在線程的創建和結束上面,效率肯定就低了。
線程池的原理:
既然是線程池(Thread pool),其實名字很形象,就是把指定數量的可用子線程放進一個"池里",有任務時取出一個線程執行,任務執行完后,並不立即銷毀線程,而是放進線程池中,等待接收下一個任務。這樣內存和cpu的開銷也比較小,並且我們可以控制線程的數量。
線程池的實現:
線程池有很多種實現方式,在python中,已經給我們提供了一個很好的實現方式:Queue-隊列。因為python中Queue本身就是同步的,所以也就是線程安全的,所以我們可以放心的讓多個線程共享一個Queue。
那么說到線程池,那么理應也得有一個任務池,任務池中存放着待執行的任務,各個線程到任務池中取任務執行,那么用Queue來實現任務池是最好不過的。
先上代碼:
class TaskManager(): def __init__(self,maxTasks,maxThreads): #最大任務書,也就是Queue的容量 self._maxTasks = maxTasks; #線程池中線程數量 self._maxThreads = maxThreads; #業務代碼 …. …. #任務池 self._taskQueue = Queue.Queue(maxTasks); #線程池,使用列表實現 self._threads = []; #在__init__中調用方法 self.initThreads(); self.initTaskQueue(); #初始化任務池 def initTaskQueue(self): while True: #業務代碼 if not self._taskQueue.full(): getTasks(self._maxTasks - self._taskQueue.qsize()); for task in taskMap["tasks"]: self._taskQueue.put(task); time.sleep(1); #初始化線程池 def initThreads(self): for i in range(self._maxThreads): #調用每個線程執行的具體任務 self._threads.append(Work(self,self._reportUrl)); def getTask(self): return self._taskQueue.get(); #具體執行的任務 class Work(threading.Thread): def __init__(self,taskmgr): threading.Thread.__init__(self); self._logger = logging.getLogger(""); self.start(); def run(self): while True: try: #取出任務並執行相關操作 self._taskmgr.getTask(); …… …… time.sleep(1); except Exception,e: self._logger.exception(e);
線程池的實現主要分兩部分,一部分是TaskMagager,即任務管理類,用來調度任務,一部分是Work,即具體需要執行的業務代碼。線程池的這種設計模式在很多地方都可以借鑒。
TaskManager
先來看TaskManager,主要包含四個方法,一個構造方法,接受傳進來的參數,執行任務池和線程池的大小等初始化信息,然后調用initTaskQueue和initThread方法初始化任務池和線程池。
最后一個方法getTask返回TaskManager類的一個實例。
Work
執行具體的業務
過程分析
- TaskManager的__init__方法初始化線程池和任務池
- initTaskQueue方法,初始化任務池,將任務填充到任務隊列。
- initThreads方法,初始化線程池,調用Work類執行任務。
- getTask方法,返回TaskManager實例,主要作用是傳給Work類,讓子線程從任務隊列中取出任務執行。
- Work類的__init__方法初始化線程,並啟動線程。
- Run方法,執行任務,並且從任務隊列中取出任務。
關鍵點:
- 在主線程,也即TaskManager的initTaskQueue方法中獲取任務並填充任務池
-
在各個子線程中,也即Work類的run方法中獲取任務池中的任務並執行。
這里需要注意的是,前面提到過,Python中的Queue是線程安全的,Queue的get方法是阻塞式,也即,如果Queue為空,子線程取不到任務,會進行等待,直到Queue中有任務可取。
三、在TaskManager的__init__方法中,最好先啟動線程,在啟動任務池。
self.initThreads();
self.initTaskQueue();
否則在initTaskQueue(主線程)中的while循環會一直執行,將會阻塞線程池的執行。在第二點中說明過,先啟動線程池,就算任務池沒有任務,子線程也會阻塞等待任務池中出現新任務。
就寫到這里,如果錯誤,請大家指正:)