Streams:深入理解Redis5.0新特性


概述

相較於Redis4.0,Redis5.0增加了很多新的特性,而streams是其中最重要的特性之一。streams是redis 的一種基本數據結構,它是一個新的強大的支持多播的可持久化的消息隊列,在設計上借鑒了kafaka。streams的數據類型本身非常簡單,有點類似於hash結構,但是它的額外特性異常強大且復雜:

  • 支持持久化。streams能持久化存儲數據,不同於 pub/sub 機制和 list  消息被消費后就會被刪除,streams消費過的數據會被持久化的保存在歷史中。
  • 支持多播。 這一點跟  pub/sub 有些類似。
  • 支持消費者組。streams 允許同一消費組內的消費者競爭消息,並提供了一系列機制允許消費者查看自己的歷史消費消息。並允許監控streams的消費者組信息,消費者組內消費者信息,也可以監控streams內消息的狀態。

基礎內容

數據 ID

streams 提供了默認的id模式用來唯一標識streams中的每一條數據,由兩部分組成:
 <millisecondsTime>-<sequenceNumber> 
millisecondsTime是redis服務所在機器的時間,sequenceNumber用於同一毫秒創建的數據。需要注意的一點是streams的id總是單調增長的,即使redis服務所在的服務器時間異常。如果當前的毫秒數小於以前的毫秒數,就會使用歷史記錄中最大的毫秒數,然后序列號遞增。而這樣做的原因是因為streams的機制允許根據時間區間或者某一個時間節點或者某一id查找數據。

向streams插入數據

streams 的基礎寫命令為 XADD ,其語法為 XADD key ID field value [field value ...] 

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>

上面的例子使用 XADD 向名為 mystream 的streams中添加了一條數據,ID使用*表示使用streams使用默認的ID,在本例中redis返回的 1574925508730-0 就是redis為我們插入的數據生成的ID。

另外streams 查看streams長度的命令為 XLEN 

127.0.0.1:6379> XLEN mystream
(integer) 3
127.0.0.1:6379>

從streams中讀取數據

從streams中讀取數據會比寫數據復雜很多,用日志文件進行對比,我們可以查看歷史日志,可以根據范圍查詢日志,我們可以通過unix的命令 tail -f 來監聽日志,可以多個用戶查看到同一份日志,也可以多個用戶只能查看到自己有權限查看的那一部分日志。

按范圍查詢: XRANGE 和 XREVRANGE

首先來介紹一下 根據范圍查詢,這兩種操作都比較簡單,以 XRANGE 為例,它的語法格式為 XRANGE key start end [COUNT count] , 我們只需要提供兩個id, start 和 end ,返回的將是一個包含 start 和 end 的閉區間。兩個特殊的ID - 和 + 分別表示可能的最小ID和最大ID。

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
2) 1) "1574925508730-0"
   2) 1) "name"
      2) "dwj"
      3) "age"
      4) "18"
127.0.0.1:6379>

  

我們前邊提到過數據id中包含了創建數據的時間信息,這意味着我們可以根據時間范圍查詢數據,為了根據時間范圍查詢,我們省略掉ID的序列號部分,如果省略,對於start ID會使用0作為默認的序列號,對於end ID會使用最大序列號作為默認值,這樣的話我們使用兩個unix時間戳去查詢數據就可以得到那個時間區間內所有的數據。

1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
127.0.0.1:6379>

 

可能還會有同學注意到語法的最后邊還有 count 參數,這個參數允許我們一次只返回固定數量的數據,然后根據返回數據的last_id,作為下一次查詢的start,這樣就允許我們在一個量非常大的streams里批量返回數據。
XREVRANGE命令與XRANGE相同,但是以相反的順序返回元素,就不重復介紹了。

通過XREAD讀取數據

XREAD允許我們從某一結點開始從streams中讀取數據,它的語法為 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] ,我們在這里主要將的是通過 XREAD 來訂閱到達streams新的數據。這種操作可能跟REDIS中原有的 pub/sub 機制或者 阻塞隊列 的概念有些類似,都是等待一個key然后獲取到新的數據,但是跟這兩種有着本質的差別:

  • streams跟 pub/sub 和 阻塞隊列 允許多個客戶端一起等待數據,默認情況下,streams會把消息推送給所有等待streams數據的客戶端,這個能力跟 pub/sub 有點類似,但是streams也允許把消息通過競爭機制推送給其中的一個客戶端(這種模式需要用到消費者組的概念,會在后邊講到)。
  •  pub/sub 的消息是fire and forget並且從不存儲,你只可以訂閱到在你訂閱時間之后產生的消息,並且消息只會推送給客戶端一次,不能查看歷史記錄。以及使用 阻塞隊列 時,當客戶端收到消息時,這個元素會從隊列中彈出,換句話說,不能查看某個消費者消費消息的歷史。而在streams中所有的消息會被無限期的加入到streams中(消息可以被顯式的刪除並且存在淘汰機制),客戶端需要記住收到的最后一條消息,用於獲取到節點之后的新消息。
  • Streams 消費者組提供了一種Pub/Sub或者阻塞列表都不能實現的控制級別,同一個Stream不同的群組,顯式地確認已經處理的項目,檢查待處理的項目的能力,申明未處理的消息,以及每個消費者擁有連貫歷史可見性,單個客戶端只能查看自己過去的消息歷史記錄。
    從streams中讀取數據
    127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
    1) 1) "mystream"
     2) 1) 1) "1574835253335-0"
           2) 1) "name"
              2) "bob"
              3) "age"
              4) "23"
        2) 1) "1574925508730-0"
           2) 1) "name"
              2) "dwj"
              3) "age"
              4) "18"
    127.0.0.1:6379>

     

    同list結構一樣,streams也提供了阻塞讀取的命令
    XREAD BLOCK 0 STREAMS mystream
    在上邊的命令中指定了BLOCK選項,超時時間為0毫秒(意味着永不會過期)。此外,這個地方使用了特殊的id  $ ,這個特殊的id代表着當前streams中最大的id,這就意味着你只會讀取streams中在你監聽時間以后的消息。有點類似於Unix的 tail -f 。另外XREAD可以同時監聽多個流中的數據。

消費者組

如果我們想要的不是多個客戶端處理相同的消息,而是多個客戶端從streams中獲取到不同的消息進行處理。也就是我們常用的生產者-消費者模型。假如想象我們具有兩個生產者p1,p2,三個消費者c1,c2,c3以及7個商品。我們想按照下面的效果進行處理

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1

為了解決這種場景,redis使用了一個名為消費者的概念,有點類似於kafka,但只是表現上。消費者組就像是一個偽消費者,它從流內讀取數據,然后分發給組內的消費者,並記錄該消費者組消費了哪些數據,處理了那些數據,並提供了一系列功能。

  1. 每條消息都提供給不同的消費者,因此不可能將相同的消息傳遞給多個消費者。
  2. 消費者在消費者組中通過名稱來識別,該名稱是實施消費者的客戶必須選擇的區分大小寫的字符串。這意味着即便斷開連接過后,消費者組仍然保留了所有的狀態,因為客戶端會重新申請成為相同的消費者。 然而,這也意味着由客戶端提供唯一的標識符。
  3. 每一個消費者組都有一個第一個ID永遠不會被消費的概念,這樣一來,當消費者請求新消息時,它能提供以前從未傳遞過的消息。
  4. 消費消息需要使用特定的命令進行顯式確認,表示:這條消息已經被正確處理了,所以可以從消費者組中逐出。
  5. 消費者組跟蹤所有當前所有待處理的消息,也就是,消息被傳遞到消費者組的一些消費者,但是還沒有被確認為已處理。由於這個特性,當訪問一個Stream的歷史消息的時候,每個消費者將只能看到傳遞給它的消息。

它的模型類似於如下

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |

從上邊的模型中我們可以看出消費者組記錄處理的最后一條消息,將消息分發給不同的消費者,每個消費者只能看到自己的消息。如果把消費者組看做streams的輔助數據結構,我們可以看出一個streams可以擁有多個消費者組,一個消費者組內可以擁有多個消費者。實際上,一個streams允許客戶端使用XREAD讀取的同時另一個客戶端通過消費者群組讀取數據。

創建一個消費者群組

我們首先創建一個包含了一些數據的streams

127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"

 

然后創建一個消費者組

127.0.0.1:6379> XGROUP CREATE fruit mygroup $
OK

注意我們需要指定一個id,這里我們使用的是特殊id $ ,我們也可以使用0或者一個unix時間戳,這樣,消費者組只會讀取這個節點之后的消息。

現在消費者組創建好了,我們可以使用XREADGROUP命令立即開始嘗試通過消費者組讀取消息。
 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] ,與 XREAD 類似,提供了BLOCK選項。假設指定消費者分別是Alice和Bob,來看看系統會怎樣返回不同消息給Alice和Bob。

127.0.0.1:6379> XREADGROUP GROUP  mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
   2) 1) 1) "1574936034258-0"
         2) 1) "message"
            2) "apple"
127.0.0.1:6379>

上邊命令代表的信息是:我要通過 mygroup 讀取streams  fruit 中的數據,我在群組中的身份是 Alice ,請給我一條數據。  > 操作符只在消費者組的上線文中有效,代表消息到目前為止沒有交給其它消費者處理過。
我們也可以使用一個有效的id,在這種情況下,消費者組會告訴我們的歷史待處理消息,而不會告訴我們新的消息。這個特性也是很有用的,當消費者因為某些原因重新啟動后,我們可以查看自己的歷史待處理消息,處理完待處理消息后再去處理新的消息。
我們可以通過 XACK 命令告訴消費者組某條消息已經被正確處理,不要顯示在我的歷史待處理消息列表中。 XACK 的語法為 XACK key group ID [ID ...] 

127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer) 1

有幾件事需要記住:

  1. 消費者是在他們第一次被提及的時候自動創建的,不需要顯式創建。
  2. 即使使用XREADGROUP,你也可以同時從多個key中讀取,但是要讓其工作,你需要給每一個Stream創建一個名稱相同的消費者組。這並不是一個常見的需求,但是需要說明的是,這個功能在技術上是可以實現的。
  3. XREADGROUP命令是一個寫命令,因為當它從Stream中讀取消息時,消費者組被修改了,所以這個命令只能在master節點調用。

從永久失敗中恢復

在一個消費者群組中可能存在多個消費者消費消息,但是也可能會存在某一個消費者永久退出消費者群組的情況,這樣我們就需要一種機制,把該消費者的待處理消息分配給消費者群組的另一個消費者。這就需要我們具有查看待處理消息的能力以及把某個消息分配給指定消費者的能力。前者是通過一個叫 XPENDING 的命令,它的語法為 XPENDING key group [start end count] [consumer] 

127.0.0.1:6379> XPENDING fruit mygroup
1) (integer) 1
2) "1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
      2) "1"

上述返回結果代表的是消費者群組有1條待處理命令,待處理消息的起始id為 1574936042937-0 ,結束id為 1574936042937-0 ,名為 Alice 的消費者有一個待處理命令,可能有人會好奇我們在前邊往 fruit 放入了3個水果,使用 XACK 處理了一個水果,消費者待處理列表中應該有兩個水果,而事實上消費者群組的待處理列表為該群組下消費者待處理消息的合集,當有消費者通過群組獲取消息的時候會改變消費者群組的狀態,這也是前邊提到的為什么 XREADGROUP 必須在master節點進行調用。
我們可以使用start end count 參數來查看某個范圍內消息的狀態

127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
   2) "Alice"
   3) (integer) 903655
   4) (integer) 1
2) 1) "1574936052018-0"
   2) "Alice"
   3) (integer) 491035
   4) (integer) 1

 

這樣我們就看到了一條消息的詳細信息,id為 1574936042937-0 的消息的消費者為 Alice ,它的pending時間為 903655 ,這個消息被分配了1次。
我們會發現第一條消息的處理時間有點長,我們懷疑 Alice 已經不能處理這條消息了,於是我們想把這條消息分配給 Bob ,這種場景下就需要用到了 XCLAIM 命令,它的語法為 XCLAIM ... ,其中min-idle-time為消息的最小空閑時間,只有消息的空閑時間大於這個值消息才會被分配,因為消息被分配的時候會重置消息的空閑時間,如果有同時把一條消息分配給兩個客戶端,只會第一條命令生效,因為當消息分配給第一個客戶端的時候重置空閑時間,第二條命令則會失效。
我們也可以使用一個獨立的進程來不斷尋找超時的消息,並把它分配給活躍的消費者,不過需要注意的是,如果消息的分配次數達到某個闕值,不應該把消息再分配出去,而是應該放到別的地方。

streams的可觀察性

streams具有不錯的可觀察性,前邊的 XPENDING 命令允許我們查看streams在某個消費者群組內待處理消息的狀態。但是我們想看的更多,比如在這個streams下有多少個group, 在這個group下有多少消費者。這就要用到 XINFO 命令:
查看 streams 信息:

127.0.0.1:6379> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
    2) 1) "name"
       2) "bob"
       3) "age"
       4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
    2) 1) "name"
       2) "dwj"
       3) "age"
       4) "18"

 

輸出中會告訴我們streams的長度,群組數量,第一條和最后一條信息的詳情。下面看一下streams下群組的信息:

127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1574936052018-0"
2) 1) "name"
   2) "mygroup-1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

 

我們可以從輸出中看到 fruit 下有兩個群組,群組的名稱以及待處理消息的數量,處理的最后一條消息。我們可以在詳細的查看下消費者群組內消費者的狀態。

127.0.0.1:6379> XINFO CONSUMERS fruit mygroup
1) 1) "name"
   2) "Alice"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 1990242
2) 1) "name"
   2) "Bob"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 9178

 

從輸出中可以看到消費者待處理消息的數量以及消費者的閑置時間。

設置streams上限

如果從streams可以查看到歷史記錄,我們可能會有疑惑,如果streams無限期的加入內存會不會夠用,一旦消息數量達到上限,將消息永久刪除或者持久化到數據庫都是有必要的,redis也提供了諸如此類場景的支持。
一種方法是我們使用 XADD 的時候指定streams的最大長度, XADD mystream MAXLEN ~ 1000 其中的數值前可以加上 ~ 標識不需要精確的將長度保持在1000,比1000多一些也可以接受。如果不使用該標識,性能會差一些。另一種方法是使用 XTRIM ,該命令也是使用 MAXLEN 選項, > XTRIM mystream MAXLEN ~ 10 

一些特殊的id

前面提到了在streams API里邊存在一些特殊的id。
首先是 - 和 + ,這兩個ID在 XRANGE 命令中使用,分別代表最小的id和最大的id。 - 代表 0-1 , + 代表 18446744073709551615-18446744073709551615 ,從使用上方便了很多。在 XPENDING 等范圍查詢中都可以使用。
 $ 代表streams中當前存在的最大的id,在 XREAD 和 XGROUP 中代表只獲取新到的消息。需要注意的是 $ 跟 + 的含義並不一致。
還有一個特殊的id是 > ,這個id只能夠在 XREADGROUP 命令中使用,意味着在這個消費者群組中,從來沒有分配給其他的消費者,所以總是使用 > 作為群組中的 last delivered ID 。

持久化,復制和消息安全性

與redis的其它數據結構一樣,streams會異步復制到從節點,並持久化到AOF和RDB文件中,並且消費者群組的狀態也會按照此機制進行持久化。
需要注意的幾點是:

  • 如果消息的持久化以及狀態很重要,則AOF必須使用強fsync配合(AOF記錄每一條更改redis數據的命令,有很多種持久化機制,在這個地方要用到的是 appendfsync always  這樣會嚴重降低Redis的速度)
  • 默認情況下,異步復制不能保證從節點的數據與主節點保持一致,在故障轉移以后可能會丟失一些內容,這跟從節點從主節點接受數據的能力有關。
  •  WAIT 命令可以用於強制將更改傳輸到一組從節點上。雖然這使得數據不太可能會丟失,但是redis的Sentinel和cluster在進行故障轉移的時候不一定會使用具有最新數據的從節點,在一些特殊故障下,反而會使用缺少一些數據的從節點。
    因此在使用redis streams和消費者群組在設計程序的時候,確保了解你的應用程序在故障期間的應對策略,並進行相應地配置,評估它對你的程序是否足夠安全。

從streams中刪除數據

刪除streams中的數據使用 XDEL 命令,其語法為 XDEL key ID [ID ...] ,需要注意的是在當前的實現中,在宏節點完全為空之前,內存並沒有真正回收,所以你不應該濫用這個特性。

streams的性能

streams的不阻塞命令,比如 XRANGE 或者不使用BLOCK選項的 XREAD 和 XREADGROUP 跟redis普通命令一致,所以沒有必要討論。如果有興趣的話可以在redis的文檔中查看到對應命令的時間復雜度。streams命令的速度在一定范圍內跟 set 是一致的, XADD 命令的速度非常快,在一個普通的機器上,一秒鍾可以插入50w~100w條數據。
我們感興趣的是在消費者群組的阻塞場景下,從通過 XADD 命令向streams中插入一條數據,到消費者通過群組讀取到這條消息的性能。
為了測試消息從產生到消費間的延遲,我們使用ruby程序進行測試,將消息的產生時間作為消息的一個字段,然后把消息推送到streams中,客戶端收到消息后使用當前時間跟生產時間進行對比,從而計算出消息的延遲時間。這個程序未進行性能優化,運行在一個雙核的機器上,同時redis也運行在這台機器上,以此來模擬不是理想條件下的場景。消息每秒鍾產生1w條,群組內有10個消費者消費數據。測試結果如下:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

99.9%的請求的延遲小於等於2毫秒,而且異常值非常接近平均值。另外需要注意的兩點:

  1. 消費者每次處理1w條消息,這樣增加了一些延遲,這樣做是為了消費速度較慢的消費者能夠保持保持消息流。
  2. 用來做測試的系統相比於現在的系統非常慢。

 

 

原文鏈接: https://redis.io/topics/streams-intro

Worktile官網:www.worktile.com 

本文作者:Worktile 工程師 杜文傑

文章首發於「Worktile官方博客」,轉載請注明來源。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM