為什么需要線程池
目前的大多數網絡服務器,包括Web服務器、Email服務器以及數據庫服務器等都具有一個共同點,就是單位時間內必須處理數目巨大的連接請求,但處理時間卻相對較短。
傳統多線程方案中我們采用的服務器模型則是一旦接受到請求之后,即創建一個新的線程,由該線程執行任務。任務執行完畢后,線程退出,這就是是“即時創建, 即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態。
我們將傳統方案中的線程執行過程分為三個過程:T1、T2、T3:
T1:線程創建時間
T2:線程執行時間,包括線程的同步等時間
T3:線程銷毀時間
那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務執行時間很頻繁的話,這筆開銷將是不可忽略的。
除此之外,線程池能夠減少創建的線程個數。通常線程池所允許的並發線程是有上界的,如果同時需要並發的線程數超過上界,那么一部分線程將會等待。而傳統方案中,如果同時請求數目為2000,那么最壞情況下,系統可能需要產生2000個線程。盡管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。
因此線程池的出現正是着眼於減少線程池本身帶來的開銷。線程池采用預創建的技術,在應用程序啟動之后,將立即創建一定數量的線程(N1),放入空閑隊列 中。這些線程都是處於阻塞(Suspended)狀態,不消耗CPU,但占用較小的內存空間。當任務到來后,緩沖池選擇一個空閑線程,把任務傳入此線程中運行。當N1個線程都在處理任務后,緩沖池自動創建一定數量的新線程,用於處理更多的任務。在任務執行完畢后線程也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閑時,大部分線程都一直處於暫停狀態,線程池自動銷毀一部分線程,回收系統資源。
基於這種預創建技術,線程池將線程創建和銷毀本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的線程本身開銷則越小,不過我們另外可能需要考慮進去線程之間同步所帶來的開銷。
構建線程池框架
一般線程池都必須具備下面幾個組成部分:
線程池管理器:用於創建並管理線程池
工作線程: 線程池中實際執行的線程
任務接口: 盡管線程池大多數情況下是用來支持網絡服務器,但是我們將線程執行的任務抽象出來,形成任務接口,從而是的線程池與具體的任務無關。
任務隊列:線程池的概念具體到實現則可能是隊列,鏈表之類的數據結構,其中保存執行線程。
我們把任務放進隊列中去,然后開N個線程,每個線程都去隊列中取一個任務,執行完了之后告訴系統說我執行完了,然后接着去隊列中取下一個任務,直至隊列中所有任務取空,退出線程。
這就是一般的線程池實現的原理,下面看一個實際的代碼:
線程池的python實現代碼:
1 # !/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import Queue 5 import threading 6 import time 7 8 class WorkManager(object): 9 def __init__(self, work_num=1000,thread_num=2): 10 self.work_queue = Queue.Queue() 11 self.threads = [] 12 self.__init_work_queue(work_num) 13 self.__init_thread_pool(thread_num) 14 15 """ 16 初始化線程 17 """ 18 def __init_thread_pool(self,thread_num): 19 for i in range(thread_num): 20 self.threads.append(Work(self.work_queue)) 21 22 """ 23 初始化工作隊列 24 """ 25 def __init_work_queue(self, jobs_num): 26 for i in range(jobs_num): 27 self.add_job(do_job, i) 28 29 """ 30 添加一項工作入隊 31 """ 32 def add_job(self, func, *args): 33 self.work_queue.put((func, list(args)))#任務入隊,Queue內部實現了同步機制 34 35 """ 36 等待所有線程運行完畢 37 """ 38 def wait_allcomplete(self): 39 for item in self.threads: 40 if item.isAlive():item.join() 41 42 class Work(threading.Thread): 43 def __init__(self, work_queue): 44 threading.Thread.__init__(self) 45 self.work_queue = work_queue 46 self.start() 47 48 def run(self): 49 #死循環,從而讓創建的線程在一定條件下關閉退出 50 while True: 51 try: 52 do, args = self.work_queue.get(block=False)#任務異步出隊,Queue內部實現了同步機制 53 do(args) 54 self.work_queue.task_done()#通知系統任務完成 55 except: 56 break 57 58 #具體要做的任務 59 def do_job(args): 60 time.sleep(0.1)#模擬處理時間 61 print threading.current_thread(), list(args) 62 63 if __name__ == '__main__': 64 start = time.time() 65 work_manager = WorkManager(10000, 10)#或者work_manager = WorkManager(10000, 20) 66 work_manager.wait_allcomplete() 67 end = time.time() 68 print "cost all time: %s" % (end-start)
Work類是一個Python線程池,不斷地從workQueue隊列中獲取需要執行的任務,執行之,並將結果寫入到resultQueue中。這里的workQueue和resultQueue都是線程安全的,其內部對各個線程的操作做了互斥。當從workQueue中獲取任務超時,則線程結束。
WorkerManager負責初始化Python線程池,提供將任務加入隊列和獲取結果的接口,並能等待所有任務完成。
在 Python 中使用線程時,這個模式是一種很常見的並且推薦使用的方式。具體工作步驟描述如下:
- 創建一個
Queue.Queue()
的實例,然后使用數據對它進行填充。 - 將經過填充數據的實例傳遞給線程類,后者是通過繼承
threading.Thread
的方式創建的。 - 生成守護線程池。
- 每次從隊列中取出一個項目,並使用該線程中的數據和 run 方法以執行相應的工作。
- 在完成這項工作之后,使用
queue.task_done()
函數向任務已經完成的隊列發送一個信號。 - 對隊列執行 join 操作,實際上意味着等到隊列為空,再退出主程序。
在使用這個模式時需要注意一點:通過將守護線程設置為 true,將允許主線程或者程序僅在守護線程處於活動狀態時才能夠退出。這種方式創建了一種簡單的方式以控制程序流程,因為在退出之前,您可以對隊列執行 join 操作、或者等到隊列為空。隊列模塊文檔詳細說明了實際的處理過程,請參見參考資料:
join()
保持阻塞狀態,直到處理了隊列中的所有項目為止。在將一個項目添加到該隊列時,未完成的任務的總數就會增加。當使用者線程調用 task_done() 以表示檢索了該項目、並完成了所有的工作時,那么未完成的任務的總數就會減少。當未完成的任務的總數減少到零時,join()
就會結束阻塞狀態。
參考:http://blog.csdn.net/yatere/article/details/7316487
http://blog.csdn.net/liu1pan2min3/article/details/8545979
http://www.ibm.com/developerworks/cn/aix/library/au-threadingpython/?ca=drs-tp3008