大家好,並發編程
進入第六篇。
在第四章,講消息通信時,我們學到了Queue消息隊列的一些基本使用。昨天我在准備如何創建線程池這一章節的時候,發現對Queue消息隊列的講解有一些遺漏的知識點,而這些知識點,也並不是無關緊要的,所以在今天的章節里,我要先對Queue先做一些補充以防大家對消息隊列有一些知識盲區。
再次提醒:
本系列所有的代碼均在Python3下編寫,也建議大家盡快投入到Python3的懷抱中來。
本文目錄
- 消息隊列的先進先出
- 創建多線程的兩種方式
. 消息隊列的先進先出
首先,要告訴大家的事,消息隊列可不是只有queue.Queue
這一個類,除它之外,還有queue.LifoQueue
和queue.PriorityQueue
這兩個類。
從名字上,對於他們之間的區別,你大概也能猜到一二吧。
queue.Queue
:先進先出隊列queue.LifoQueue
:后進先出隊列queue.PriorityQueue
:優先級隊列
先來看看,我們的老朋友,queue.Queue
。
所謂的先進先出
(FIFO,First in First Out),就是先進入隊列的消息,將優先被消費。
這和我們日常排隊買菜是一樣的,先排隊的人肯定是先買到菜。
用代碼來說明一下
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
看看輸出,符合我們先進先出的預期。存入隊列的順序是01234
,被消費的順序也是01234
。
0
1
2
3
4
再來看看Queue.LifoQueue
,后進先出,就是后進入消息隊列的,將優先被消費。
這和我們羽毛球筒是一樣的,最后放進羽毛球筒的球,會被第一個取出使用。
用代碼來看下
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
來看看輸出,符合我們后進后出的預期。存入隊列的順序是01234
,被消費的順序也是43210
。
4
3
2
1
0
最后來看看Queue.PriorityQueue
,優先級隊列。
這和我們日常生活中的會員機制有些類似,辦了金卡的人比銀卡的服務優先,辦了銀卡的人比不辦卡的人服務優先。
來用代碼看一下
from queue import PriorityQueue
# 重新定義一個類,繼承自PriorityQueue
class MyPriorityQueue(PriorityQueue):
def __init__(self):
PriorityQueue.__init__(self)
self.counter = 0
def put(self, item, priority):
PriorityQueue.put(self, (priority, self.counter, item))
self.counter += 1
def get(self, *args, **kwargs):
_, _, item = PriorityQueue.get(self, *args, **kwargs)
return item
queue = MyPriorityQueue()
queue.put('item2', 2)
queue.put('item5', 5)
queue.put('item3', 3)
queue.put('item4', 4)
queue.put('item1', 1)
while True:
print(queue.get())
來看看輸出,符合我們的預期。我們存入入隊列的順序是25341
,對應的優先級也是25341
,可是被消費的順序絲毫不受傳入順序的影響,而是根據指定的優先級來消費。
item1
item2
item3
item4
item5
. 創建多線程的兩種方式
在使用多線程處理任務時也不是線程越多越好,由於在切換線程的時候,需要切換上下文環境,依然會造成cpu的大量開銷。為解決這個問題,線程池的概念被提出來了。預先創建好一個較為優化的數量的線程,讓過來的任務立刻能夠使用,就形成了線程池。
在Python3中,創建線程池是通過concurrent.futures
函數庫中的ThreadPoolExecutor
類來實現的。
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def target():
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)
#: 生成線程池最大線程為5個
pool = ThreadPoolExecutor(5)
for i in range(100):
pool.submit(target) # 往線程中提交,並運行
從結果來看,前面設置線程池最大線程數5個,有生效。
running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0
running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1
...
...
除了使用上述第三方模塊的方法之外,我們還可以自己結合前面所學的消息隊列來自定義線程池。
這里我們就使用queue來實現一個上面同樣效果的例子,大家感受一下。
import time
import threading
from queue import Queue
def target(q):
while True:
msg = q.get()
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)
def pool(workers,queue):
for n in range(workers):
t = threading.Thread(target=target, args=(queue,))
t.daemon = True
t.start()
queue = Queue()
# 創建一個線程池:並設置線程數為5
pool(5, queue)
for i in range(100):
queue.put("start")
# 消息都被消費才能結束
queue.join()
輸出是和上面是完全一樣的效果
running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0
running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1
...
...
構建線程池的方法,是可以很靈活的,大家有舉可以自己多研究。但是建議只要掌握一種自己熟悉的,能快速上手的就好了。
好了,今天的內容就是這些了。
