本文鏈接:https://www.cnblogs.com/tujia/p/13565799.html
背景:單線程處理任務是阻塞式,一個一個任務處理的,在處理大量任務的時候,消耗時間長;同時如果服務器配置還不錯的話,光跑一個單線程的話,也有點浪費了配置了
多線程:多線程是異步、並發的,可以大大提高程序的IO處理速度,更好的利用系統資源,更快完成任務
Talk is cheap. Show me the code。下面就直接上代碼了~
一、簡單多線程
# 簡單多線程 # 解釋: # 1)一個工人只做一個任務,做完就撤了; # 2)有多少個任務就得有多少個工人; # 3)這個方式處理任務需要快,但人員成本開銷高。 # @see https://docs.python.org/zh-cn/3/library/threading.html?highlight=threading#threading.Thread import threading # 任務 def task(taskId): thread_name = threading.current_thread().getName() print('工人【%s】正在處理任務【%d】:do something...' % (thread_name, taskId)) def main(): threads = [] # 這里弄5個線程(一個線程相當於一個工人) for i in range(5): # target 參數指定線程要處理的任務函數,args 參數傳遞參數到任務函數去 t = threading.Thread(target=task, args=(i+1,)) # 啟動線程 t.start() threads.append(t) # 阻塞線程 for t in threads: t.join() if __name__ == '__main__': main()
執行結果:
二、線程池
正如上面注釋里說的,線程雖好,但太多線程的話,資源(cpu、內存等)的消耗也挺大的;而且線程都是處理完一個任務就“死”掉了,不能復用,有點浪費
於是,這個“線程池”這個東西出現了。線程池就是管理線程的池子,池子如果設置容量,控制好線程的數量,也就控制好了資源的消耗~
# 線程池 # 解釋: # 1)一個工人同一時間只做一個任務,但做完一個任務可以接着做下一個任務; # 2)可以分配多個任務給少量工人,減少人員成本開銷。 # @see https://docs.python.org/zh-cn/3/library/concurrent.futures.html import threading from concurrent.futures import ThreadPoolExecutor # 任務 def task(taskId): thread_name = threading.current_thread().getName() print('工人【%s】正在處理任務【%d】:do something...' % (thread_name, taskId)) def main(): # 初始化線程池(商會),定義好池里最多有幾個工人 pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix='Thread') # 准備10個任務 for i in range(10): # 提交任務到池子(商會)里(它會自動分配給工人) pool.submit(task, i+1) if __name__ == '__main__': main()
注:這里把線程池比喻成了“商會”,線程比喻成“工人”,方便大家理解。
執行結果:
注:從上圖可以看到線程被復用了,而且 Thread_0 被復用了最多次,而 Thread_4 毫無用武之地,沒有使用到(你可以把任務數量調大,看看結果又會是怎么樣?)
三、線程池2
這里的示例和上面是一樣的,只是加了一點代碼來模擬任務耗時,方便大家觀察線程池是怎么分配任務的
# 線程池 # 解釋: # 1)一個工人同一時間只做一個任務,但做完一個任務可以接着做下一個任務; # 2)可以分配多個任務給少量工人,減少人員成本開銷。 # 3)任務按順序分配給空閑工人,但每個任務的耗時不一樣,任務不是按順序被完成的,后提交的任務可能會先被完成 import time import random import threading from concurrent.futures import ThreadPoolExecutor # 任務 def task(taskId, consuming): thread_name = threading.current_thread().getName() print('工人【%s】正在處理任務【%d】:do something...' % (thread_name, taskId)) # 模擬任務耗時(秒) time.sleep(consuming) print('任務【%d】:done' % taskId) def main(): # 5個工人 pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix='Thread') # 准備10個任務 for i in range(10): # 模擬任務耗時(秒) consuming = random.randint(1, 5) pool.submit(task, i+1, consuming) if __name__ == '__main__': main()
執行結果:
注:看執行結果來看,不難發現:多線程是異步的,且會並發
三、隊列
除了使用線程池,我們還可以使用隊列來處理任務。任務排好隊,工人(線程)按順序不斷從隊列里取任務,處理任務~
# 線程隊列 # 解釋: # 1)一個隊列有N個工人在排隊,按隊列排序給他們分配任務; # 2)做得再快,也要按排隊排序來接任務,不能插隊搶任務。 # @see https://docs.python.org/zh-cn/3/library/queue.html#queue-objects import time import random import threading from queue import Queue # 自定義線程 class CustomThread(threading.Thread): def __init__(self, queue, **kwargs): super(CustomThread, self).__init__(**kwargs) self.__queue = queue def run(self): while True: # (工人)獲取任務 item = self.__queue.get() # 執行任務 item[0](*item[1:]) # 告訴隊列,任務已完成 self.__queue.task_done() # 任務 def task(taskId, consuming): thread_name = threading.current_thread().getName() print('工人【%s】正在處理任務【%d】:do something...' % (thread_name, taskId)) # 模擬任務耗時(秒) time.sleep(consuming) print('任務【%d】:done' % taskId) def main(): q = Queue() # 招工,這里招了5個工人(啟動5個線程)
for i in range(5): t = CustomThread(q, daemon=True) # 工人已經准備好接活了 t.start() # 來活了(往隊列里塞任務) for i in range(10): taskId = i + 1 # 模擬任務耗時(秒) consuming = random.randint(1, 5) q.put((task, taskId, consuming)) # 阻塞隊列 q.join() if __name__ == '__main__': main()
執行結果:
注1:這里用了一個自定義線程類,具體可以看這里:https://docs.python.org/zh-cn/3/library/queue.html#queue-objects
注2:簡單來說,就是先招固定的N個工人(創建N個線程),讓它們盯着任務隊列;然后往隊列里塞任務;事先盯着隊列的工人們發現有任務,就開始接任務、處理任務了
注3:while True 的意思是循環(不斷)地從任務隊列里取任務,如果不用 while True的話,線程處理完一個任務就會結束,無法復用,任務也無法全部處理完
注4:這里的線程不能使用 Tread.join 方法,join(阻塞線程)方法會和 while True 沖突,使線程無法結束,一直阻塞着
注5:while True 最后是如何自動結束掉的,目前我還不太了解。希望知道原理的大大能告訴我一下~
本文鏈接:https://www.cnblogs.com/tujia/p/13565799.html
完。