最近在做一個東西的時候發現需要用到循環隊列,實現先進先出(FIFO),不斷往里面添加數據,當達到某個限定值時,最先進去的出去,然后再添加。之后需要對隊列里面的內容進行一個篩選,作其他處理。首先我想到了python的Queue模塊,先簡單的介紹一下,具體的可以參考Queue。
一、Queue模塊
Python queue模塊有三種隊列及構造函數:
1、Python queue模塊的FIFO隊列先進先出。 class queue.queue(maxsize)
2、LIFO類似於堆棧,即先進后出。 class queue.Lifoqueue(maxsize)
3、還有一種是優先級隊列級別越低越先出來。 class queue.Priorityqueue(maxsize)
此包中的常用方法(q =queue.queue()):
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.get(block=True, timeout=None]) 從隊列中返回並刪除一個元素,timeout等待時間
q.get_nowait() 相當q.get(False)
q.put(item, block=True, timeout=None)非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列為空,再執行別的操作
這里引入Queue模塊就可以實現FIFO了,當我要提取隊列里面的數據的時候,我得利用get()方法先把數據提取出來,然后存入一個新的數組,由於我要隔一段時間對里面的數據進行提取,而get()方法提取的時候會刪除對應的元素,這樣有點兒不方便。因此我自己寫了一個類(寫的不好的地方,大神們可以告訴我,不喜勿噴hh)
二、自定義一個類(circular_queue.py)
# 定義隊列類 class MyQueue(object): def __init__(self, size): self.size = size # 定義隊列長度 self.queue = [] # 存儲隊列 列表 def __str__(self): # 返回對象的字符串表達式,方便查看 return str(self.queue) def inQueue(self, n): # 入隊 if self.isFull(): return -1 self.queue.append(n) # 列表末尾添加新的對象 def outQueue(self): # 出隊 if self.isEmpty(): return -1 firstelement = self.queue[0] # 刪除隊頭元素 self.queue.remove(firstelement) # 刪除隊操作 return firstelement def delete(self, n): # 刪除某元素 element = self.queue[n] self.queue.remove(element) def inPut(self, n, m): # 插入某元素 n代表列表當前的第n位元素 m代表傳入的值 self.queue[n] = m def getSize(self): # 獲取當前長度 return len(self.queue) def getnumber(self, n): # 獲取某個元素 element = self.queue[n] return element def isEmpty(self): # 判斷是否為空 if len(self.queue) == 0: return True return False def isFull(self): # 判斷隊列是否滿 if len(self.queue) == self.size: return True return False
三、測試
在文件circular_queue.py中類的下頭繼續添加如下代碼
queue = MyQueue(5) # 定義一個大小為5的隊列 for i in range(8): # 先判斷隊列是否為滿 if not queue.isFull(): queue.inQueue(i) else: # 先出隊再添加 queue.outQueue() queue.inQueue(i) print(queue)
運行結果如下
可以看出已經實現了,當然你也可以試試其他幾種方法,比如提取元素,獲取隊列大小等等。接下來就可以像操作列表對元素進行提取,並且不會刪除元素。
for i in range(queue.getSize()): item =queue.getnumber(i) print(item)
當然也可以把circular_queue。py文件單獨保存,然后到其他文件中引入,放入同一文件夾下,新建test.py,然后運行試試:
from circular_queue import * import time def fun2(num): num += 1 return num def fun1(num, res): while True: num = fun2(num) # 先判斷隊列是否為滿 if not queue.isFull(): queue.inQueue([num, res]) else: # 先出隊再添加 queue.outQueue() queue.inQueue([num, res]) print(queue) time.sleep(2) if __name__ == "__main__": queue = MyQueue(5) fun1(0, '-aaa-')