對於IO密集型任務,很適合用線程池來處理消息,具體思路如下:
1、創建一個Queue隊列
import Queue
queue = Queue.Queue()
2、寫一個類,繼承線程類,重寫run方法處理隊列中方法和參數,由於queue是線程安全的,因此這塊不必加鎖;同時,創建一個線程池:
from threading import Thread
for i in range(thread_num): debug_logger.info("開啟第%s個處理線程" % i) thread = ThreadManger(queue) thread.start()
class ThreadManger(Thread):
def __init__(self, queue):
super(ThreadManger, self).__init__()
self.queue = queue
def run(self):
while True:
if self.queue.qsize() > 0:
method, para = self.queue.get(timeout=5.0)
method(para)
self.queue.task_done()
3、將要處理的參數和處理這個參數的方法放進這個隊列里,注意將參數和方法組成個元組放進去,這塊也可以做成多線程,使用上面創建的queue就行
queue.put((method, parament))
4、這個時候一個簡單的線程池就做好了,當隊列是空的時候,線程池從queue取不到東西但線程不會結束,一旦有新的內容放到了queue中,線程池會立馬取出來並處理掉
5、線程池大小,對於IO密集型的任務,假如cpu處理時間是0.5s,IO時間是1.5s,那線程池的個數為((1.5+0.5)/0.5)*核數+1
6、對於計算密集型的任務,由於GIL導致任何時候只能有一個線程在執行,所以就不要搞並發了,由於線程的切換耗費的時間會導致執行起來比單線程還慢
7、對於分布式任務,可以考慮使用mq,比如kafka或者rabbitmq