線程池
簡單線程池
import queue import threading import time class ThreadPool(object): #創建線程池類 def __init__(self, max_num=20): #創建一個最大長度為20的隊列 self.queue = queue.Queue(max_num) #創建一個隊列 for i in range(max_num): #循環把線程對象加入到隊列中 self.queue.put(threading.Thread) #把線程的類名放進去,執行完這個Queue def get_thread(self): #定義方法從隊列里獲取線程 return self.queue.get() #在隊列中獲取值 def add_thread(self): #線程執行完任務后,在隊列里添加線程 self.queue.put(threading.Thread) def func(pool,a1): time.sleep(1) print(a1) pool.add_thread() #線程執行完任務后,隊列里再加一個線程 p = ThreadPool(10) #執行init方法; 一次最多執行10個線程 for i in range(100): thread = p.get_thread() #線程池10個線程,每一次循環拿走一個拿到類名,沒有就等待 t = thread(target=func, args=(p, i,)) #創建線程; 線程執行func函數的這個任務;args是給函數傳入參數 t.start() #激活線程
復雜線程池
線程池要點:
1,創建線程池時,是在需要執行線程的時候創建線程,而不是創建好最大隊列等待執行
2,創建一個回調函數,檢查出剩余隊列的任務,當線程執行完函數的時候通知線程池,
3,使用線程池時讓其循環獲取任務,並執行
4,線程池,讓其自行的去激活線程,執行完成后,關閉退出
import queue import threading import time import contextlib StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() # 最多創建的線程數(線程池最大容量) self.max_num = max_num self.terminal = False #如果為True 終止所有線程,不在獲取新任務 self.generate_list = [] # 真實創建的線程列表 self.free_list = []# 空閑線程數量 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) :return: 如果線程池已經終止,則返回True否則None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() #創建線程 w = (func, args, callback,) #把參數封裝成元祖 self.q.put(w) #添加到任務隊列 def generate_thread(self): """ 創建一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread # 獲取當前線程 self.generate_list.append(current_thread) #添加到已經創建的線程里 event = self.q.get() # 取任務並執行 while event != StopEvent: # 是元組=》是任務;如果不為停止信號 執行任務 func, arguments, callback = event #解開任務包; 分別取出值 try: result = func(*arguments) #運行函數,把結果賦值給result status = True #運行結果是否正常 except Exception as e: status = False #表示運行不正常 result = e #結果為錯誤信息 if callback is not None: #是否存在回調函數 try: callback(status, result) #執行回調函數 except Exception as e: pass if self.terminal: # 默認為False,如果調用terminal方法 event = StopEvent #等於全局變量,表示停止信號 else: # self.free_list.append(current_thread) #執行完畢任務,添加到閑置列表 # event = self.q.get() #獲取任務 # self.free_list.remove(current_thread) # 獲取到任務之后,從閑置列表中刪除;不是元組,就不是任務 with self.worker_state(self.free_list, current_thread): event = self.q.get() else: self.generate_list.remove(current_thread) #如果收到終止信號,就從已經創建的線程列表中刪除 def close(self): #終止線程 num = len(self.generate_list) #獲取總共創建的線程數 while num: self.q.put(StopEvent) #添加停止信號,有多少線程添加多少表示終止的信號 num -= 1 def terminate(self): #終止線程(清空隊列) self.terminal = True #把默認的False更改成True while self.generate_list: #如果有已經創建線程存活 self.q.put(StopEvent) #有幾個線程就發幾個終止信號 self.q.empty() #清空隊列 @contextlib.contextmanager def worker_state(self, state_list, worker_thread): state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) def work(i): print(i) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,)) # 將任務放在隊列中 # 着手開始處理任務 # - 創建線程 # - 有空閑線程,擇不再創建線程 # - 不能高於線程池的限制 # - 根據任務個數判斷 # - 線程去隊列中取任務 pool.terminate()
詳細參考:http://www.cnblogs.com/wupeiqi/articles/4839959.html
