【Redis】Redis Stream 介紹


一、添加數據(往名為mystream的Stream中添加了一個條目)
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
 
 
二、獲取一個Stream的條目數量
> XLEN mystream
(integer) 1
 
 
三、XRANGE范圍查詢
# 根據范圍查詢Stream,兩個特殊的ID- 和 +分別表示可能的最小ID和最大ID
> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"
 
# 查詢兩毫秒時間內產生的所有條目
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
 
# 在最后放一個可選的COUNT選項,只獲取前面N個項目
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"
 
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"
 
# XREVRANGE命令與XRANGE相同,但是以相反的順序返回元素
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"
 
 
四、使用XREAD監聽新項目
# XREAD的非阻塞形式,獲取流 mystream中ID大於0-0的消息
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"
 
# 通過指定BLOCK參數XREAD變成一個阻塞命令
> XREAD BLOCK 0 STREAMS mystream $
 
# 通過傳入多個key來同時從不同的Stream中讀取數據
XREAD COUNT 2 STREAMS mystream otherstream 0 0
 
# 通過指定BLOCK參數,將XREAD變成一個阻塞命令(同樣支持多個steam)
> XREAD BLOCK 0 STREAMS mystream $
 
 
五、消費者組
  • XGROUP 用於創建,摧毀或者管理消費者組。
  • XREADGROUP 用於通過消費者組從一個Stream中讀取。
  • XACK 是允許消費者將待處理消息標記為已正確處理的命令。
 
# 創建一個消費者組
> XGROUP CREATE mystream mygroup $
OK
 
# 在開始從Stream中讀取之前,讓我們往里面放一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
 
# 嘗試使用消費者組讀取(特殊的ID > : 消息到目前為止從未傳遞給其他消費者)
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"
 
# 歷史待處理的消息(使用特殊ID>獲取新消息后,會更新消費者組的最后ID,這些為歷史待處理消息)
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"
 
# 消息標記為已處理
> XACK mystream mygroup 1526569495631-0
(integer) 1
 
 
六、從永久性失敗中恢復
# XPENDING簡單使用
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"
 
# XPENDING命令可以傳遞更多的參數來獲取更多信息,完整的命令簽名如下
XPENDING <key> <groupname> [<start-id> <end-id> <count> [<conusmer-name>]]
 
# 待處理消息
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
 
# XCLAIM 改變消息的所有者
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
 
# XCLAIM提供了最小空閑時間,只有在上述消息的空閑時間大於指定的空閑時間時,操作才會起作用,下面第二個客戶端的認領會失敗
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
 
# XCLAIM命令也返回了消息數據本身。但這不是強制性的。可以使用JUSTID選項,以便僅返回成功認領的消息的ID
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"
 
 
七、Streams 的可觀察性
# 報告關於Stream本身的信息
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"
 
# 有關消費者組的信息
> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0
 
# 檢查特定消費者組的狀態
> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983
 
 
七、設置Streams的上限
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
 
# 如果不需要精確的1000個項目。它可以是1000或者1010或者1030,只要保證至少保存1000個項目就行
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
 
 
八、從Stream中刪除單個項目
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM