簡單實現一個線程池:
import queue
import threading
import time
class ThreadPool(object): #創建線程池類
def __init__(self, max_num=20): #創建一個最大長度為20的隊列
self.queue = queue.Queue(max_num) #創建一個隊列
for i in range(max_num): #循環把線程對象加入到隊列中
self.queue.put(threading.Thread) #把線程的類名放進去,執行完這個Queue
def get_thread(self): #定義方法從隊列里獲取線程
return self.queue.get() #在隊列中獲取值
def add_thread(self): #線程執行完任務后,在隊列里添加線程
self.queue.put(threading.Thread)
def func(pool,a1):
time.sleep(1)
print(a1)
pool.add_thread() #線程執行完任務后,隊列里再加一個線程
p = ThreadPool(10) #執行init方法; 一次最多執行10個線程
for i in range(100):
thread = p.get_thread() #線程池10個線程,每一次循環拿走一個拿到類名,沒有就等待
t = thread(target=func, args=(p, i,)) #創建線程; 線程執行func函數的這個任務;args是給函數傳入參數
t.start() #激活線程
復雜線程池
線程池要點:
1,創建線程池時,是在需要執行線程的時候創建線程,而不是創建好最大隊列等待執行
2,創建一個回調函數,檢查出剩余隊列的任務,當線程執行完函數的時候通知線程池,
3,使用線程池時讓其循環獲取任務,並執行
4,線程池,讓其自行的去激活線程,執行完成后,關閉退出
import queue
import threading
import time
import contextlib
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num):
self.q = queue.Queue() # 最多創建的線程數(線程池最大容量)
self.max_num = max_num
self.terminal = False #如果為True 終止所有線程,不在獲取新任務
self.generate_list = [] # 真實創建的線程列表
self.free_list = []# 空閑線程數量
def run(self, func, args, callback=None):
"""
線程池執行一個任務
:param func: 任務函數
:param args: 任務函數所需參數
:param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)
:return: 如果線程池已經終止,則返回True否則None
"""
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread() #創建線程
w = (func, args, callback,) #把參數封裝成元祖
self.q.put(w) #添加到任務隊列
def generate_thread(self):
"""
創建一個線程
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循環去獲取任務函數並執行任務函數
"""
current_thread = threading.currentThread # 獲取當前線程
self.generate_list.append(current_thread) #添加到已經創建的線程里
event = self.q.get() # 取任務並執行
while event != StopEvent: # 是元組=》是任務;如果不為停止信號 執行任務
func, arguments, callback = event #解開任務包; 分別取出值
try:
result = func(*arguments) #運行函數,把結果賦值給result
status = True #運行結果是否正常
except Exception as e:
status = False #表示運行不正常
result = e #結果為錯誤信息
if callback is not None: #是否存在回調函數
try:
callback(status, result) #執行回調函數
except Exception as e:
pass
if self.terminal: # 默認為False,如果調用terminal方法
event = StopEvent #等於全局變量,表示停止信號
else:
# self.free_list.append(current_thread) #執行完畢任務,添加到閑置列表
# event = self.q.get() #獲取任務
# self.free_list.remove(current_thread) # 獲取到任務之后,從閑置列表中刪除;不是元組,就不是任務
with self.worker_state(self.free_list, current_thread):
event = self.q.get()
else:
self.generate_list.remove(current_thread) #如果收到終止信號,就從已經創建的線程列表中刪除
def close(self): #終止線程
num = len(self.generate_list) #獲取總共創建的線程數
while num:
self.q.put(StopEvent) #添加停止信號,有多少線程添加多少表示終止的信號
num -= 1
def terminate(self): #終止線程(清空隊列)
self.terminal = True #把默認的False更改成True
while self.generate_list: #如果有已經創建線程存活
self.q.put(StopEvent) #有幾個線程就發幾個終止信號
self.q.empty() #清空隊列
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
def work(i):
print(i)
pool = ThreadPool(10)
for item in range(50):
pool.run(func=work, args=(item,))
# 將任務放在隊列中
# 着手開始處理任務
# - 創建線程
# - 有空閑線程,擇不再創建線程
# - 不能高於線程池的限制
# - 根據任務個數判斷
# - 線程去隊列中取任務
pool.terminate()
本文參考oldboy吳佩琦老師(詳細地址:)http://www.cnblogs.com/wupeiqi/articles/4839959.html
