Python的Queue模塊提供一種適用於多線程編程的FIFO實現。它可用於在生產者(producer)和消費者(consumer)之間線程安全(thread-safe)地傳遞消息或其它數據,因此多個線程可以共用同一個Queue實例。Queue的大小(元素的個數)可用來限制內存的使用。
Basic FIFO Queue
Queue
類實現了一個基本的先進先出(FIFO)
容器,使用put()
將元素添加到序列尾端,get()
從隊列尾部移除元素。
LIFO Queue
與標准FIFO實現Queue
不同的是,LifoQueue
使用后進先出序(會關聯一個棧數據結構)。
Priority Queue(優先隊列)
除了按元素入列順序外,有時需要根據隊列中元素的特性來決定元素的處理順序。例如,財務部門的打印任務可能比碼農的代碼打印任務優先級更高。PriorityQueue
依據隊列中內容的排序順序(sort order)來決定那個元素將被檢索。
Using Queues with Threads
queue介紹
- queue是python中的標准庫,俗稱隊列,可以直接import 引用,在python2.x中,模塊名為Queue
- 在python中,多個線程之間的數據是共享的,多個線程進行數據交換的時候,不能夠保證數據的安全性和一致性,所以當多個線程需要進行數據交換的時候,隊列就出現了,隊列可以完美解決線程間的數據交換,保證線程間數據的安全性和一致性
queue模塊有三種隊列及構造函數:
- Python queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize)
- LIFO類似於堆,即先進后出。 class queue.LifoQueue(maxsize)
- 還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize)
queue模塊中的常用方法:
- queue.qsize() 返回隊列的大小
- queue.empty() 如果隊列為空,返回True,反之False
- queue.full() 如果隊列滿了,返回True,反之False
- queue.full 與 maxsize 大小對應
- queue.get([block[, timeout]])獲取隊列,timeout等待時間
- queue.get_nowait() 相當queue.get(False)
- queue.put(item) 寫入隊列,timeout等待時間
- queue.put_nowait(item) 相當queue.put(item, False)
- queue.task_done() 在完成一項工作之后,queue.task_done()函數向任務已經完成的隊列發送一個信號
- queue.join() 實際上意味着等到隊列為空,再執行別的操作
代碼例子:
# coding: utf-8 from queue import Queue # Queue是python標准庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞 def test_queue(): q=Queue(10) for i in range(5): q.put(i) while not q.empty(): print(q.get()) def test_LifoQueue(): import queue # queue.LifoQueue() #后進先出->堆棧 q = queue.LifoQueue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) def test_PriorityQueue(): import queue # queue.PriorityQueue() #優先級 q = queue.PriorityQueue(3) # 優先級,優先級用數字表示,數字越小優先級越高 q.put((10, 'a')) q.put((-1, 'b')) q.put((100, 'c')) print(q.get()) print(q.get()) print(q.get()) # Python queue隊列,實現並發,在網站多線程推薦最后也一個例子,比這貨簡單,但是不夠規范 from queue import Queue # Queue在3.x中改成了queue import random import threading import time from threading import Thread class Producer(threading.Thread): """ Producer thread 制作線程 """ def __init__(self, t_name, queue): # 傳入線程名、實例化隊列 threading.Thread.__init__(self, name=t_name) # t_name即是threadName self.data = queue """ run方法 和start方法: 它們都是從Thread繼承而來的,run()方法將在線程開啟后執行, 可以把相關的邏輯寫到run方法中(通常把run方法稱為活動[Activity]); start()方法用於啟動線程。 """ def run(self): for i in range(5): # 生成0-4五條隊列 print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)) # 當前時間t生成編號d並加入隊列 self.data.put(i) # 寫入隊列編號 time.sleep(random.randrange(10) / 5) # 隨機休息一會 print("%s: %s producing finished!" % (time.ctime(), self.getName)) # 編號d隊列完成制作 class Consumer(threading.Thread): """ Consumer thread 消費線程,感覺來源於COOKBOOK """ def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue def run(self): for i in range(5): val = self.data.get() print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val)) # 編號d隊列已經被消費 time.sleep(random.randrange(10)) print("%s: %s consuming finished!" % (time.ctime(), self.getName())) # 編號d隊列完成消費 def main(): """ Main thread 主線程 """ queue = Queue() # 隊列實例化 producer = Producer('Pro.', queue) # 調用對象,並傳如參數線程名、實例化隊列 consumer = Consumer('Con.', queue) # 同上,在制造的同時進行消費 producer.start() # 開始制造 consumer.start() # 開始消費 """ join()的作用是,在子線程完成運行之前,這個子線程的父線程將一直被阻塞。 join()方法的位置是在for循環外的,也就是說必須等待for循環里的兩個進程都結束后,才去執行主進程。 """ producer.join() consumer.join() print('All threads terminate!') if __name__=="__main__": test_queue() print("=====后進先出=====") test_LifoQueue() print("=====優先級======") test_PriorityQueue() main()
線程模塊
- 多線程主要的內容:直接進行多線程操作,線程同步,帶隊列的多線程;
Python3 通過兩個標准庫 _thread 和 threading 提供對線程的支持。
_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能還是比較有限的。
threading 模塊除了包含 _thread 模塊中的所有方法外,還提供的其他方法:
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
- run(): 用以表示線程活動的方法。
- start():啟動線程活動。
- join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
#queue 多線程各個線程的運算的值放到一個隊列中,到主線程的時候再拿出來,以此來代替 #return的功能,因為在線程是不能返回一個值的
# https://www.cnblogs.com/zephyr-1/p/6043785.html import time import threading from Queue import Queue def job(l,q): q.put([i**2 for i in l]) def multithreading(data): q = Queue() threads = [] for i in xrange(4): t = threading.Thread(target = job,args = (data[i],q)) t.start() threads.append(t) for thread in threads: thread.join() results = [] for _ in range(4): results.append(q.get()) print results if __name__ == "__main__": data = [[1,2,3],[4,5,6],[3,4,3],[5,5,5]] multithreading(data) [[1, 4, 9], [16, 25, 36], [9, 16, 9], [25, 25, 25]]