Redis之Stream


【Stream簡介】
Redis5.0增加了一種新的數據結構:Stream,它是一個支持多播的可持久化消息隊列。
Stream的結構是一個鏈表,將所有的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
和其它的結構一樣,結構上的不同,都是value不同,key都是字符串形式的。key就是Stream這個結構的名稱。

 在使用xadd指令追加消息時,Stream也符合creates if not exists的規則,Redis會自動創建一個Stream結構。

【消費組-消費者】
每個Stream都可以掛多個消費組,每個消費組會有游標(last_delivered_id)在Stream上往前移動,表示當前消費組已經消費到哪條消息了。
每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,需要用xgroup create進行創建,需要指定Stream某個消息ID開始消費,這個ID用於初始化消費組的游標。
每個消費組的狀態相互獨立,同一份Stream內部的消息會被每個消費組都消費到。
同一個消費組可以掛多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使組的游標往前移動。每個消費者都有一個組內唯一名稱。
消費者內部會有一個狀態變量pending_ids,記錄了當前已經被客戶端讀取,但沒有ack的消息。一旦某個消息被ack,它就減掉這個消息。這個變量被稱為PEL,即Pending Entries List,這是一個核心的數據結構,用來確保客戶端至少消費了消息一次。
【消息】
消息ID的形式是毫秒時間戳-消息順序,如1527846880572-5,表示當前消息在1527846880572毫秒時產生,並且是毫秒內產生的第5條消息。
消息ID可以由服務器自動生成,也可以由客戶端自己指定,但是形式必須是“整數-整數”,且后面加入的消息ID必須要大於前面的消息ID。
消息內容就是hash結構的鍵值對。
【基本操作】
xadd,xdel,xrange,xlen表示對Stream內部消息的增,刪(標記),查(除標記外的),長度查詢操作。
del表示刪除Stream這個結構。

 

【獨立消費】
在不定義消費組的情況下,我們也可以用xread單獨消費Stream內的消息。

也可以阻塞讀消息,從尾部讀取消息,等待最新的消息到來。當有其它客戶端添加消息時,這里就會解除阻塞,顯示最新的消息:

如果客戶端想要使用xread進行順序消費,那么就要記住服務器返回的游標,即消息ID,下次繼續調用xread時,將上次的ID作為參數傳遞進去,就可以繼續消費后續的消息。
block 0表示用原阻塞,block 1000表示阻塞1S,如果時間內沒有消息,返回nil。
【創建消費組】
xgroup create可以創建消費組,創建時需要指定游標位置,也可以從頭或者尾創建,創建完可以用xinfo查看組信息:

【組內消費】
xreadgroup可以進行消費組的組內消費,需要提供消費組名稱,消費者名稱和起始消息ID。也可以阻塞讀。讀到新消息后,對應ID會進入消費者的PEL中,客戶端處理完畢后用xack指令通知服務器,本條消息已經處理完畢,消息就會從PEL中移除。
xreadgroup GROUP consumegroup-name consume-name count 1 streams stream-name
xack stream-name consumegroup-name messageid
【Stream消息廢棄策略】
xadd指令有一個定長參數maxlen,可以將老的消息干掉:
【一定要記得xack】
如果消費者收到了消息,但是讀取完並沒有xack,那么PEL就會不斷增多,消費組很多的時候,這個PEL占用的內存就會放大。
這個PEL的作用就是為了讓消費者處理完成后,再通過xack告訴服務器我已經處理完了,你可以不用管了,如果客戶端處理過程中發生異常,那么這個PEL就會記住這個ID,客戶端連上后會再次收到這個消息。此時客戶端可以設置xreadgroup讀取,起始消息必須是有效的消息ID,如果設置0-0,表示讀取所有PEL消息,以及游標之后的新消息(相當於增量讀取)。
【Stream的消息丟失】
主從復制時,如果failover發生,Redis可能會丟失從節點沒來得及處理的消息,所以Stream此時也有可能會丟失消息。
【分區】
Redis沒有原生支持分區能力,如果想分區,那就多建幾個Stream,然后在客戶端采用一定策略來生產消息到不同的Stream。
這里可以對比Kafka的HashStrategy,它通過客戶端的hash算法,來將消息放入了不同的分區。

【參考】

 

《Redis深度歷險 核心原理與應用實踐》
https://juejin.cn/post/7028439051308892167

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM