原創博文,轉載請注明出處
今天在學習python進程與線程時,無意間發現了線程池threadpool模塊,見官方文檔。
模塊使用非常簡單,前提是得需要熟悉線程池的工作原理。
我們知道系統處理任務時,需要為每個請求創建和銷毀對象。當有大量並發任務需要處理時,再使用傳統的多線程就會造成大量的資源創建銷毀導致服務器效率的下降。這時候,線程池就派上用場了。線程池技術為線程創建、銷毀的開銷問題和系統資源不足問題提供了很好的解決方案。
優點:
(1)可以控制產生線程的數量。通過預先創建一定數量的工作線程並限制其數量,控制線程對象的內存消耗。(2)降低系統開銷和資源消耗。通過對多個請求重用線程,線程創建、銷毀的開銷被分攤到了多個請求上。另外通過限制線程數量,降低虛擬機在垃圾回收方面的開銷。(3)提高系統響應速度。線程事先已被創建,請求到達時可直接進行處理,消除了因線程創建所帶來的延遲,另外多個線程可並發處理。
線程池的基本實現方法:
(1)線程池管理器。創建並維護線程池,根據需要調整池的大小,並監控線程泄漏現象。
(2)工作線程。它是一個可以循環執行任務的線程,沒有任務時處於 Wait 狀態,新任務到達時可被喚醒。
(3)任務隊列。它提供一種緩沖機制,用以臨時存放待處理的任務,同時作為並發線程的 monitor 對象。
(4)任務接口。它是每個任務必須實現的接口,工作線程通過該接口調度任務的執行。
構建線程池管理器時,首先初始化任務隊列(Queue),運行時通過調用添加任務的方法將任務添加到任務隊列中。之后創建並啟動一定數量的工作線程,將這些線程保存在線程隊列中。線程池管理器在運行時可根據需要增加或減少工作線程數量。工作線程運行時首先鎖定任務隊列,以保證多線程對任務隊列的正確並發訪問,如隊列中有待處理的任務,工作線程取走一個任務並釋放對任務隊列的鎖定,以便其他線程實現對任務隊列的訪問和處理。在獲取任務之后工作線程調用任務接口完成對任務的處理。當任務隊列為空時,工作線程加入到任務隊列的等待線程列表中,此時工作線程處於 Wait 狀態,幾乎不占 CPU 資源。一旦新的任務到達,通過調用任務列表對象的notify方法,從等待線程列表中喚醒一個工作線程以對任務進行處理。通過這種協作模式,既節省了線程創建、銷毀的開銷,又保證了對任務的並發處理,提高了系統的響應速度。
簡而言之:就是把並發執行的任務傳遞給一個線程池,來替代為每個並發執行的任務都啟動一個新的線程。只要池里有空閑的線程,任務就會分配給一個線程執行。
1 pool = ThreadPool(poolsize) 2 requests = makeRequests(some_callable,list_of_args,callback) 3 [pool.putRequest(req) for req in requests] 4 pool.wait()
第一行的意思是創建一個可存放poolsize個數目的線程的線程池。
第二行的意思是調用makeRequests創建請求。 some_callable是需要開啟多線程處理的函數,list_of_args是函數參數,callback是可選參數回調,默認是無。
第三行的意思是把運行多線程的函數放入線程池中。
最后一行的意思是等待所有的線程完成工作后退出。
通過分析源代碼,其實發現里面的內容很簡單。
1 import sys 2 import threading 3 import Queue 4 import traceback 5 6 7 # exceptions 8 class NoResultsPending(Exception): 9 """All work requests have been processed.""" 10 pass 11 12 class NoWorkersAvailable(Exception): 13 """No worker threads available to process remaining requests.""" 14 pass 15 16 17 # internal module helper functions 18 def _handle_thread_exception(request, exc_info): 19 """Default exception handler callback function. 20 21 This just prints the exception info via ``traceback.print_exception``. 22 23 """ 24 traceback.print_exception(*exc_info) 25 26 27 # utility functions 28 def makeRequests(callable_, args_list, callback=None, #用來創建多個任務請求 callback是回調函數處理結果,exc_callback是用來處理發生的異常 29 exc_callback=_handle_thread_exception): 30 """Create several work requests for same callable with different arguments. 31 32 Convenience function for creating several work requests for the same 33 callable where each invocation of the callable receives different values 34 for its arguments. 35 36 ``args_list`` contains the parameters for each invocation of callable. 37 Each item in ``args_list`` should be either a 2-item tuple of the list of 38 positional arguments and a dictionary of keyword arguments or a single, 39 non-tuple argument. 40 41 See docstring for ``WorkRequest`` for info on ``callback`` and 42 ``exc_callback``. 43 44 """ 45 requests = [] 46 for item in args_list: 47 if isinstance(item, tuple): 48 requests.append( 49 WorkRequest(callable_, item[0], item[1], callback=callback, 50 exc_callback=exc_callback) 51 ) 52 else: 53 requests.append( 54 WorkRequest(callable_, [item], None, callback=callback, 55 exc_callback=exc_callback) 56 ) 57 return requests 58 59 60 # classes 61 class WorkerThread(threading.Thread): #工作線程 62 """Background thread connected to the requests/results queues. 63 64 A worker thread sits in the background and picks up work requests from 65 one queue and puts the results in another until it is dismissed. 66 67 """ 68 69 def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): 70 """Set up thread in daemonic mode and start it immediatedly. 71 72 ``requests_queue`` and ``results_queue`` are instances of 73 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new 74 worker thread. 75 76 """ 77 threading.Thread.__init__(self, **kwds) 78 self.setDaemon(1) 79 self._requests_queue = requests_queue #任務隊列 80 self._results_queue = results_queue #結果隊列 81 self._poll_timeout = poll_timeout 82 self._dismissed = threading.Event() 83 self.start() 84 85 def run(self): 86 """Repeatedly process the job queue until told to exit.""" 87 while True: 88 if self._dismissed.isSet(): #如果標識位設為True,則表示線程非阻塞 89 # we are dismissed, break out of loop 90 break 91 # get next work request. If we don't get a new request from the 92 # queue after self._poll_timout seconds, we jump to the start of 93 # the while loop again, to give the thread a chance to exit. 94 try: 95 request = self._requests_queue.get(True, self._poll_timeout)#獲取待處理任務,block設為True,標識線程同步 ,並設置超時時間 96 except Queue.Empty: 97 continue 98 else: 99 if self._dismissed.isSet():再次判斷,因為在取任務期間,線程有可能被掛起 100 # we are dismissed, put back request in queue and exit loop 101 self._requests_queue.put(request) #添加任務到任務隊列 102 break 103 try: 104 result = request.callable(*request.args, **request.kwds) 105 self._results_queue.put((request, result)) 106 except: 107 request.exception = True 108 self._results_queue.put((request, sys.exc_info())) 109 110 def dismiss(self): 111 """Sets a flag to tell the thread to exit when done with current job.""" 112 self._dismissed.set() 113 114 115 class WorkRequest: #創建單個任務請求 116 """A request to execute a callable for putting in the request queue later. 117 118 See the module function ``makeRequests`` for the common case 119 where you want to build several ``WorkRequest`` objects for the same 120 callable but with different arguments for each call. 121 122 """ 123 124 def __init__(self, callable_, args=None, kwds=None, requestID=None, 125 callback=None, exc_callback=_handle_thread_exception): 126 """Create a work request for a callable and attach callbacks. 127 128 A work request consists of the a callable to be executed by a 129 worker thread, a list of positional arguments, a dictionary 130 of keyword arguments. 131 132 A ``callback`` function can be specified, that is called when the 133 results of the request are picked up from the result queue. It must 134 accept two anonymous arguments, the ``WorkRequest`` object and the 135 results of the callable, in that order. If you want to pass additional 136 information to the callback, just stick it on the request object. 137 138 You can also give custom callback for when an exception occurs with 139 the ``exc_callback`` keyword parameter. It should also accept two 140 anonymous arguments, the ``WorkRequest`` and a tuple with the exception 141 details as returned by ``sys.exc_info()``. The default implementation 142 of this callback just prints the exception info via 143 ``traceback.print_exception``. If you want no exception handler 144 callback, just pass in ``None``. 145 146 ``requestID``, if given, must be hashable since it is used by 147 ``ThreadPool`` object to store the results of that work request in a 148 dictionary. It defaults to the return value of ``id(self)``. 149 150 """ 151 if requestID is None: 152 self.requestID = id(self) #id返回對象的內存地址 153 else: 154 try: 155 self.requestID = hash(requestID) #哈希處理 156 except TypeError: 157 raise TypeError("requestID must be hashable.") 158 self.exception = False 159 self.callback = callback 160 self.exc_callback = exc_callback 161 self.callable = callable_ 162 self.args = args or [] 163 self.kwds = kwds or {} 164 165 def __str__(self): 166 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ 167 (self.requestID, self.args, self.kwds, self.exception) 168 169 class ThreadPool: #線程池管理器 170 """A thread pool, distributing work requests and collecting results. 171 172 See the module docstring for more information. 173 174 """ 175 176 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 177 """Set up the thread pool and start num_workers worker threads. 178 179 ``num_workers`` is the number of worker threads to start initially. 180 181 If ``q_size > 0`` the size of the work *request queue* is limited and 182 the thread pool blocks when the queue is full and it tries to put 183 more work requests in it (see ``putRequest`` method), unless you also 184 use a positive ``timeout`` value for ``putRequest``. 185 186 If ``resq_size > 0`` the size of the *results queue* is limited and the 187 worker threads will block when the queue is full and they try to put 188 new results in it. 189 190 .. warning: 191 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 192 the possibilty of a deadlock, when the results queue is not pulled 193 regularly and too many jobs are put in the work requests queue. 194 To prevent this, always set ``timeout > 0`` when calling 195 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 196 197 """ 198 self._requests_queue = Queue.Queue(q_size) #任務隊列 199 self._results_queue = Queue.Queue(resq_size) #結果隊列 200 self.workers = [] #工作線程 201 self.dismissedWorkers = [] #睡眠線程 202 self.workRequests = {} #一個字典 鍵是id 值是request 203 self.createWorkers(num_workers, poll_timeout) 204 205 def createWorkers(self, num_workers, poll_timeout=5): 206 """Add num_workers worker threads to the pool. 207 208 ``poll_timout`` sets the interval in seconds (int or float) for how 209 ofte threads should check whether they are dismissed, while waiting for 210 requests. 211 212 """ 213 for i in range(num_workers): 214 self.workers.append(WorkerThread(self._requests_queue, 215 self._results_queue, poll_timeout=poll_timeout)) 216 217 def dismissWorkers(self, num_workers, do_join=False): 218 """Tell num_workers worker threads to quit after their current task.""" 219 dismiss_list = [] 220 for i in range(min(num_workers, len(self.workers))): 221 worker = self.workers.pop() 222 worker.dismiss() 223 dismiss_list.append(worker) 224 225 if do_join: 226 for worker in dismiss_list: 227 worker.join() 228 else: 229 self.dismissedWorkers.extend(dismiss_list) 230 231 def joinAllDismissedWorkers(self): 232 """Perform Thread.join() on all worker threads that have been dismissed. 233 """ 234 for worker in self.dismissedWorkers: 235 worker.join() 236 self.dismissedWorkers = [] 237 238 def putRequest(self, request, block=True, timeout=None): 239 """Put work request into work queue and save its id for later.""" 240 assert isinstance(request, WorkRequest) 241 # don't reuse old work requests 242 assert not getattr(request, 'exception', None) 243 self._requests_queue.put(request, block, timeout) 244 self.workRequests[request.requestID] = request #確立一對一對應關系 一個id對應一個request 245 246 def poll(self, block=False):#處理任務, 247 """Process any new results in the queue.""" 248 while True: 249 # still results pending? 250 if not self.workRequests: #沒有任務 251 raise NoResultsPending 252 # are there still workers to process remaining requests? 253 elif block and not self.workers:#無工作線程 254 raise NoWorkersAvailable 255 try: 256 # get back next results 257 request, result = self._results_queue.get(block=block) 258 # has an exception occured? 259 if request.exception and request.exc_callback: 260 request.exc_callback(request, result) 261 # hand results to callback, if any 262 if request.callback and not \ 263 (request.exception and request.exc_callback): 264 request.callback(request, result) 265 del self.workRequests[request.requestID] 266 except Queue.Empty: 267 break 268 269 def wait(self): 270 """Wait for results, blocking until all have arrived.""" 271 while 1: 272 try: 273 self.poll(True) 274 except NoResultsPending: 275 break
有三個類 ThreadPool,workRequest,workThread,
第一步我們需要建立一個線程池調度ThreadPool實例(根據參數而產生多個線程works),然后再通過makeRequests創建具有多個不同參數的任務請求workRequest,然后把任務請求用putRequest放入線程池中的任務隊列中,此時線程workThread就會得到任務callable,然后進行處理后得到結果,存入結果隊列。如果存在callback就對結果調用函數。
注意:結果隊列中的元素是元組(request,result)這樣就一一對應了。
在我的下一篇文章關於爬蟲方面的,我將嘗試使用線程池來加強爬蟲的爬取效率 。