Python並發編程之消息隊列補充及如何創建線程池(六)


大家好,並發編程 進入第六篇。

在第四章,講消息通信時,我們學到了Queue消息隊列的一些基本使用。昨天我在准備如何創建線程池這一章節的時候,發現對Queue消息隊列的講解有一些遺漏的知識點,而這些知識點,也並不是無關緊要的,所以在今天的章節里,我要先對Queue先做一些補充以防大家對消息隊列有一些知識盲區。

再次提醒
本系列所有的代碼均在Python3下編寫,也建議大家盡快投入到Python3的懷抱中來。

本文目錄


  • 消息隊列的先進先出
  • 創建多線程的兩種方式

. 消息隊列的先進先出

首先,要告訴大家的事,消息隊列可不是只有queue.Queue這一個類,除它之外,還有queue.LifoQueuequeue.PriorityQueue這兩個類。

從名字上,對於他們之間的區別,你大概也能猜到一二吧。

queue.Queue:先進先出隊列
queue.LifoQueue:后進先出隊列
queue.PriorityQueue:優先級隊列

先來看看,我們的老朋友,queue.Queue
所謂的先進先出(FIFO,First in First Out),就是先進入隊列的消息,將優先被消費。
這和我們日常排隊買菜是一樣的,先排隊的人肯定是先買到菜。

用代碼來說明一下

import queue

q = queue.Queue()

for i in range(5):
q.put(i)

while not q.empty():
print q.get()

看看輸出,符合我們先進先出的預期。存入隊列的順序是01234,被消費的順序也是01234

0
1
2
3
4

再來看看Queue.LifoQueue,后進先出,就是后進入消息隊列的,將優先被消費。

這和我們羽毛球筒是一樣的,最后放進羽毛球筒的球,會被第一個取出使用。

用代碼來看下

import queue

q = queue.LifoQueue()

for i in range(5):
q.put(i)

while not q.empty():
print q.get()

來看看輸出,符合我們后進后出的預期。存入隊列的順序是01234,被消費的順序也是43210

4
3
2
1
0

最后來看看Queue.PriorityQueue,優先級隊列。
這和我們日常生活中的會員機制有些類似,辦了金卡的人比銀卡的服務優先,辦了銀卡的人比不辦卡的人服務優先。

來用代碼看一下

from queue import PriorityQueue

# 重新定義一個類,繼承自PriorityQueue
class MyPriorityQueue(PriorityQueue):
def __init__(self):
PriorityQueue.__init__(self)
self.counter = 0

def put(self, item, priority):
PriorityQueue.put(self, (priority, self.counter, item))
self.counter += 1

def get(self, *args, **kwargs):
_, _, item = PriorityQueue.get(self, *args, **kwargs)
return item


queue = MyPriorityQueue()
queue.put('item2', 2)
queue.put('item5', 5)
queue.put('item3', 3)
queue.put('item4', 4)
queue.put('item1', 1)

while True:
print(queue.get())

來看看輸出,符合我們的預期。我們存入入隊列的順序是25341,對應的優先級也是25341,可是被消費的順序絲毫不受傳入順序的影響,而是根據指定的優先級來消費。

item1
item2
item3
item4
item5

. 創建多線程的兩種方式

在使用多線程處理任務時也不是線程越多越好,由於在切換線程的時候,需要切換上下文環境,依然會造成cpu的大量開銷。為解決這個問題,線程池的概念被提出來了。預先創建好一個較為優化的數量的線程,讓過來的任務立刻能夠使用,就形成了線程池。

在Python3中,創建線程池是通過concurrent.futures函數庫中的ThreadPoolExecutor類來實現的。

import time
import threading
from concurrent.futures import ThreadPoolExecutor


def target():
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)

#: 生成線程池最大線程為5個
pool = ThreadPoolExecutor(5)

for i in range(100):
pool.submit(target) # 往線程中提交,並運行

從結果來看,前面設置線程池最大線程數5個,有生效。

running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0

running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1

...
...

除了使用上述第三方模塊的方法之外,我們還可以自己結合前面所學的消息隊列來自定義線程池。

這里我們就使用queue來實現一個上面同樣效果的例子,大家感受一下。

import time
import threading
from queue import Queue

def target(q):
while True:
msg = q.get()
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)

def pool(workers,queue):
for n in range(workers):
t = threading.Thread(target=target, args=(queue,))
t.daemon = True
t.start()

queue = Queue()
# 創建一個線程池:並設置線程數為5
pool(5, queue)

for i in range(100):
queue.put("start")

# 消息都被消費才能結束
queue.join()

輸出是和上面是完全一樣的效果

running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0

running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1

...
...

構建線程池的方法,是可以很靈活的,大家有舉可以自己多研究。但是建議只要掌握一種自己熟悉的,能快速上手的就好了。

好了,今天的內容就是這些了。



 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM