線程池python


原創博文,轉載請注明出處

今天在學習python進程與線程時,無意間發現了線程池threadpool模塊,見官方文檔

模塊使用非常簡單,前提是得需要熟悉線程池的工作原理。

我們知道系統處理任務時,需要為每個請求創建和銷毀對象。當有大量並發任務需要處理時,再使用傳統的多線程就會造成大量的資源創建銷毀導致服務器效率的下降。這時候,線程池就派上用場了。線程池技術為線程創建、銷毀的開銷問題和系統資源不足問題提供了很好的解決方案。

優點:

       (1)可以控制產生線程的數量。通過預先創建一定數量的工作線程並限制其數量,控制線程對象的內存消耗。(2)降低系統開銷和資源消耗。通過對多個請求重用線程,線程創建、銷毀的開銷被分攤到了多個請求上。另外通過限制線程數量,降低虛擬機在垃圾回收方面的開銷。(3)提高系統響應速度。線程事先已被創建,請求到達時可直接進行處理,消除了因線程創建所帶來的延遲,另外多個線程可並發處理。

 線程池的基本實現方法:
  (1)線程池管理器。創建並維護線程池,根據需要調整池的大小,並監控線程泄漏現象。
  (2)工作線程。它是一個可以循環執行任務的線程,沒有任務時處於 Wait 狀態,新任務到達時可被喚醒。
  (3)任務隊列。它提供一種緩沖機制,用以臨時存放待處理的任務,同時作為並發線程的 monitor 對象。
  (4)任務接口。它是每個任務必須實現的接口,工作線程通過該接口調度任務的執行。
      構建線程池管理器時,首先初始化任務隊列(Queue),運行時通過調用添加任務的方法將任務添加到任務隊列中。之后創建並啟動一定數量的工作線程,將這些線程保存在線程隊列中。線程池管理器在運行時可根據需要增加或減少工作線程數量。工作線程運行時首先鎖定任務隊列,以保證多線程對任務隊列的正確並發訪問,如隊列中有待處理的任務,工作線程取走一個任務並釋放對任務隊列的鎖定,以便其他線程實現對任務隊列的訪問和處理。在獲取任務之后工作線程調用任務接口完成對任務的處理。當任務隊列為空時,工作線程加入到任務隊列的等待線程列表中,此時工作線程處於 Wait 狀態,幾乎不占 CPU 資源。一旦新的任務到達,通過調用任務列表對象的notify方法,從等待線程列表中喚醒一個工作線程以對任務進行處理。通過這種協作模式,既節省了線程創建、銷毀的開銷,又保證了對任務的並發處理,提高了系統的響應速度。

      簡而言之:就是把並發執行的任務傳遞給一個線程池,來替代為每個並發執行的任務都啟動一個新的線程。只要池里有空閑的線程,任務就會分配給一個線程執行。

1 pool = ThreadPool(poolsize)
2 requests = makeRequests(some_callable,list_of_args,callback)
3 [pool.putRequest(req) for req in requests]
4 pool.wait()

第一行的意思是創建一個可存放poolsize個數目的線程的線程池。

第二行的意思是調用makeRequests創建請求。 some_callable是需要開啟多線程處理的函數,list_of_args是函數參數,callback是可選參數回調,默認是無。

第三行的意思是把運行多線程的函數放入線程池中。

最后一行的意思是等待所有的線程完成工作后退出。

通過分析源代碼,其實發現里面的內容很簡單。

  1 import sys
  2 import threading
  3 import Queue
  4 import traceback
  5 
  6 
  7 # exceptions
  8 class NoResultsPending(Exception):
  9     """All work requests have been processed."""
 10     pass
 11 
 12 class NoWorkersAvailable(Exception):
 13     """No worker threads available to process remaining requests."""
 14     pass
 15 
 16 
 17 # internal module helper functions
 18 def _handle_thread_exception(request, exc_info):
 19     """Default exception handler callback function.
 20 
 21     This just prints the exception info via ``traceback.print_exception``.
 22 
 23     """
 24     traceback.print_exception(*exc_info)
 25 
 26 
 27 # utility functions
 28 def makeRequests(callable_, args_list, callback=None,  #用來創建多個任務請求 callback是回調函數處理結果,exc_callback是用來處理發生的異常 
 29         exc_callback=_handle_thread_exception):
 30     """Create several work requests for same callable with different arguments.
 31 
 32     Convenience function for creating several work requests for the same
 33     callable where each invocation of the callable receives different values
 34     for its arguments.
 35 
 36     ``args_list`` contains the parameters for each invocation of callable.
 37     Each item in ``args_list`` should be either a 2-item tuple of the list of
 38     positional arguments and a dictionary of keyword arguments or a single,
 39     non-tuple argument.
 40 
 41     See docstring for ``WorkRequest`` for info on ``callback`` and
 42     ``exc_callback``.
 43 
 44     """
 45     requests = []
 46     for item in args_list:
 47         if isinstance(item, tuple):
 48             requests.append(
 49                 WorkRequest(callable_, item[0], item[1], callback=callback,
 50                     exc_callback=exc_callback)
 51             )
 52         else:
 53             requests.append(
 54                 WorkRequest(callable_, [item], None, callback=callback,
 55                     exc_callback=exc_callback)
 56             )
 57     return requests
 58 
 59 
 60 # classes
 61 class WorkerThread(threading.Thread):     #工作線程
 62     """Background thread connected to the requests/results queues.
 63 
 64     A worker thread sits in the background and picks up work requests from
 65     one queue and puts the results in another until it is dismissed.
 66 
 67     """
 68 
 69     def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
 70         """Set up thread in daemonic mode and start it immediatedly.
 71 
 72         ``requests_queue`` and ``results_queue`` are instances of
 73         ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
 74         worker thread.
 75 
 76         """
 77         threading.Thread.__init__(self, **kwds)
 78         self.setDaemon(1)
 79         self._requests_queue = requests_queue     #任務隊列
 80         self._results_queue = results_queue       #結果隊列    
 81         self._poll_timeout = poll_timeout
 82         self._dismissed = threading.Event()       
 83         self.start()
 84 
 85     def run(self):
 86         """Repeatedly process the job queue until told to exit."""
 87         while True:
 88             if self._dismissed.isSet():  #如果標識位設為True,則表示線程非阻塞
 89                 # we are dismissed, break out of loop
 90                 break
 91             # get next work request. If we don't get a new request from the
 92             # queue after self._poll_timout seconds, we jump to the start of
 93             # the while loop again, to give the thread a chance to exit.
 94             try:
 95                 request = self._requests_queue.get(True, self._poll_timeout)#獲取待處理任務,block設為True,標識線程同步 ,並設置超時時間 
 96             except Queue.Empty:
 97                 continue
 98             else:
 99                 if self._dismissed.isSet():再次判斷,因為在取任務期間,線程有可能被掛起
100                     # we are dismissed, put back request in queue and exit loop
101                     self._requests_queue.put(request) #添加任務到任務隊列
102                     break
103                 try:
104                     result = request.callable(*request.args, **request.kwds)
105                     self._results_queue.put((request, result))
106                 except:
107                     request.exception = True
108                     self._results_queue.put((request, sys.exc_info()))
109 
110     def dismiss(self):
111         """Sets a flag to tell the thread to exit when done with current job."""
112         self._dismissed.set()
113 
114 
115 class WorkRequest:      #創建單個任務請求
116     """A request to execute a callable for putting in the request queue later.
117 
118     See the module function ``makeRequests`` for the common case
119     where you want to build several ``WorkRequest`` objects for the same
120     callable but with different arguments for each call.
121 
122     """
123 
124     def __init__(self, callable_, args=None, kwds=None, requestID=None,
125             callback=None, exc_callback=_handle_thread_exception):
126         """Create a work request for a callable and attach callbacks.
127 
128         A work request consists of the a callable to be executed by a
129         worker thread, a list of positional arguments, a dictionary
130         of keyword arguments.
131 
132         A ``callback`` function can be specified, that is called when the
133         results of the request are picked up from the result queue. It must
134         accept two anonymous arguments, the ``WorkRequest`` object and the
135         results of the callable, in that order. If you want to pass additional
136         information to the callback, just stick it on the request object.
137 
138         You can also give custom callback for when an exception occurs with
139         the ``exc_callback`` keyword parameter. It should also accept two
140         anonymous arguments, the ``WorkRequest`` and a tuple with the exception
141         details as returned by ``sys.exc_info()``. The default implementation
142         of this callback just prints the exception info via
143         ``traceback.print_exception``. If you want no exception handler
144         callback, just pass in ``None``.
145 
146         ``requestID``, if given, must be hashable since it is used by
147         ``ThreadPool`` object to store the results of that work request in a
148         dictionary. It defaults to the return value of ``id(self)``.
149 
150         """
151         if requestID is None:
152             self.requestID = id(self) #id返回對象的內存地址
153         else:
154             try:
155                 self.requestID = hash(requestID) #哈希處理 
156             except TypeError:
157                 raise TypeError("requestID must be hashable.")
158         self.exception = False
159         self.callback = callback
160         self.exc_callback = exc_callback
161         self.callable = callable_
162         self.args = args or []
163         self.kwds = kwds or {}
164 
165     def __str__(self):
166         return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
167             (self.requestID, self.args, self.kwds, self.exception)
168 
169 class ThreadPool:  #線程池管理器
170     """A thread pool, distributing work requests and collecting results.
171 
172     See the module docstring for more information.
173 
174     """
175 
176     def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
177         """Set up the thread pool and start num_workers worker threads.
178 
179         ``num_workers`` is the number of worker threads to start initially.
180 
181         If ``q_size > 0`` the size of the work *request queue* is limited and
182         the thread pool blocks when the queue is full and it tries to put
183         more work requests in it (see ``putRequest`` method), unless you also
184         use a positive ``timeout`` value for ``putRequest``.
185 
186         If ``resq_size > 0`` the size of the *results queue* is limited and the
187         worker threads will block when the queue is full and they try to put
188         new results in it.
189 
190         .. warning:
191             If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
192             the possibilty of a deadlock, when the results queue is not pulled
193             regularly and too many jobs are put in the work requests queue.
194             To prevent this, always set ``timeout > 0`` when calling
195             ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
196 
197         """
198         self._requests_queue = Queue.Queue(q_size)  #任務隊列
199         self._results_queue = Queue.Queue(resq_size) #結果隊列
200         self.workers = []  #工作線程
201         self.dismissedWorkers = [] #睡眠線程
202         self.workRequests = {}  #一個字典 鍵是id 值是request
203         self.createWorkers(num_workers, poll_timeout)
204 
205     def createWorkers(self, num_workers, poll_timeout=5):
206         """Add num_workers worker threads to the pool.
207 
208         ``poll_timout`` sets the interval in seconds (int or float) for how
209         ofte threads should check whether they are dismissed, while waiting for
210         requests.
211 
212         """
213         for i in range(num_workers):
214             self.workers.append(WorkerThread(self._requests_queue,
215                 self._results_queue, poll_timeout=poll_timeout))
216 
217     def dismissWorkers(self, num_workers, do_join=False):
218         """Tell num_workers worker threads to quit after their current task."""
219         dismiss_list = []
220         for i in range(min(num_workers, len(self.workers))):
221             worker = self.workers.pop()
222             worker.dismiss()
223             dismiss_list.append(worker)
224 
225         if do_join:
226             for worker in dismiss_list:
227                 worker.join()
228         else:
229             self.dismissedWorkers.extend(dismiss_list)
230 
231     def joinAllDismissedWorkers(self):
232         """Perform Thread.join() on all worker threads that have been dismissed.
233         """
234         for worker in self.dismissedWorkers:
235             worker.join()
236         self.dismissedWorkers = []
237 
238     def putRequest(self, request, block=True, timeout=None):
239         """Put work request into work queue and save its id for later."""
240         assert isinstance(request, WorkRequest)
241         # don't reuse old work requests
242         assert not getattr(request, 'exception', None)
243         self._requests_queue.put(request, block, timeout)
244         self.workRequests[request.requestID] = request  #確立一對一對應關系 一個id對應一個request 
245 
246     def poll(self, block=False):#處理任務,
247         """Process any new results in the queue."""
248         while True:
249             # still results pending?
250             if not self.workRequests: #沒有任務
251                 raise NoResultsPending
252             # are there still workers to process remaining requests?
253             elif block and not self.workers:#無工作線程
254                 raise NoWorkersAvailable
255             try:
256                 # get back next results
257                 request, result = self._results_queue.get(block=block)
258                 # has an exception occured?
259                 if request.exception and request.exc_callback:
260                     request.exc_callback(request, result)
261                 # hand results to callback, if any
262                 if request.callback and not \
263                        (request.exception and request.exc_callback):
264                     request.callback(request, result)
265                 del self.workRequests[request.requestID]
266             except Queue.Empty:
267                 break
268 
269     def wait(self):
270         """Wait for results, blocking until all have arrived."""
271         while 1:
272             try:
273                 self.poll(True)
274             except NoResultsPending:
275                 break

 有三個類 ThreadPool,workRequest,workThread,

第一步我們需要建立一個線程池調度ThreadPool實例(根據參數而產生多個線程works),然后再通過makeRequests創建具有多個不同參數的任務請求workRequest,然后把任務請求用putRequest放入線程池中的任務隊列中,此時線程workThread就會得到任務callable,然后進行處理后得到結果,存入結果隊列。如果存在callback就對結果調用函數。

注意:結果隊列中的元素是元組(request,result)這樣就一一對應了。 

在我的下一篇文章關於爬蟲方面的,我將嘗試使用線程池來加強爬蟲的爬取效率 。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM