python3標准庫里自帶線程池ThreadPoolExecutor和進程池ProcessPoolExecutor。
如果你用的是python2,那可以下載一個模塊,叫threadpool,這是線程池。對於進程池可以使用python自帶的multiprocessing.Pool。
當然也可以自己寫一個threadpool。
# coding:utf-8
import Queue
import threading
import sys
import time
import math
class WorkThread(threading.Thread):
def __init__(self, task_queue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.task_queue = task_queue
self.start()
self.idle = True
def run(self):
sleep_time = 0.01 # 第1次無任務可做時休息10毫秒
multiply = 0
while True:
try:
# 從隊列中取一個任務
func, args, kwargs = self.task_queue.get(block=False)
self.idle = False
multiply = 0
# 執行之
func(*args, **kwargs)
except Queue.Empty:
time.sleep(sleep_time * math.pow(2, multiply))
self.idle = True
multiply += 1
continue
except:
print sys.exc_info()
raise
class ThreadPool:
def __init__(self, thread_num=10, max_queue_len=1000):
self.max_queue_len = max_queue_len
self.task_queue = Queue.Queue(max_queue_len) # 任務等待隊列
self.threads = []
self.__create_pool(thread_num)
def __create_pool(self, thread_num):
for i in xrange(thread_num):
thread = WorkThread(self.task_queue)
self.threads.append(thread)
def add_task(self, func, *args, **kwargs):
'''添加一個任務,返回任務等待隊列的長度
調用該方法前最后先調用isSafe()判斷一下等待的任務是不是很多,以防止提交的任務被拒絕
'''
try:
self.task_queue.put((func, args, kwargs))
except Queue.Full:
raise # 隊列已滿時直接拋出異常,不給執行
return self.task_queue.qsize()
def isSafe(self):
'''等待的任務數量離警界線還比較遠
'''
return self.task_queue.qsize() < 0.9 * self.max_queue_len
def wait_for_complete(self):
'''等待提交到線程池的所有任務都執行完畢
'''
#首先任務等待隊列要變成空
while not self.task_queue.empty():
time.sleep(1)
# 其次,所以計算線程要變成idle狀態
while True:
all_idle = True
for th in self.threads:
if not th.idle:
all_idle = False
break
if all_idle:
break
else:
time.sleep(1)
if __name__ == '__main__':
def foo(a, b):
print a + b
time.sleep(0.01)
thread_pool = ThreadPool(10, 100)
'''在Windows上測試不通過,Windows上Queue.Queue不是線程安全的'''
size = 0
for i in xrange(10000):
try:
size = thread_pool.add_task(foo, i, 2 * i)
except Queue.Full:
print 'queue full, queue size is ', size
time.sleep(2)
