官方介紹:https://redis.io/topics/streams-intro
Springboot整合文檔:https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/#redis.streams
一、Stream
Stream是Redis 5.0新增的一種數據結構。它是一個新的很強大的支持多播的可持久化消息隊列(極大借鑒了Kafka的設計)。
Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄
Redis Stream 提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,並且能記住每一個客戶端的訪問位置,還能保證消息不丟失
Stream結構與特征:
它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用 xadd指令追加消息時自動創建。
名詞
Consumer Group:消費組,使用 XGROUP CREATE 命令創建,一個消費組有多個消費者(Consumer)
last_delivered_id :游標,每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
pending_ids :Pending Entries List (PEL),Stream在每個消費者結構內部維護了一個狀態變量pending_ids ,它記錄了當前已經被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)的消息的ID。
每個Stream都可以掛多個消費組(Consumer Group),每個消費組會有個游標 last_delivered_id 在Stream 之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令 xgroup create 進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化 last_delivered_id 變量。
每個消費組的狀態都是獨立的,相互不受影響,也就是說同一份Stream 內部的消息會被每個消費組都消費到
同一個消費組可以掛接多個消費者(Consumer),這些消費組之間是競爭關系(一個消息只會被消費組內的一個消費者消費),任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動,每個消費者有一個組內唯一名稱。
消息ID
消息ID的形式是timestamp-sequence,毫秒數-序號,表示當前消息產生時的毫秒數,sequence表示當前毫秒數產生的第幾個消息
消息內容
消息內容就是鍵值對
Stream消息太多怎么辦?
Stream如果消息太多,導致消息鏈表很長,占用內存很大,怎么辦?
Redis提供了一個定長Stream功能,通過XADD命令的MAXLEN選項或者XTRIM命令,限制Stream的長度,當達到限制的長度時,就會將老的消息干掉,從而使Stream保持恆定的長度。
1)XADD命令的MAXLEN選項
2)XTRIM命令(推薦)
定長策略:MAXLEN,如限制Stream的長度為10:XTRIM mystream MAXLEN 10,或者 XTRIM mystream MAXLEN ~ 10,~參數意味着並不需要精確到長度為10,只保證最少為10即可,實際上允許比10稍多一些。
最小ID策略(>=6.2版本):MINID,該技術可以逐出ID小於指定ID的條目。XTRIM mystream MINID 649085820:所有ID低於649085820-0的條目都將被刪除
Stream相關命令:
XADD - 向Stream追加消息到末尾,如果隊列不存在,則創建一個隊列。語法:
XADD key ID field value [field value ...]
key :隊列名稱,如果不存在就創建
ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
field value : 記錄
XTRIM - 對流進行修剪,限制長度,返回從流中刪除的條目數
XDEL - 從Stream中刪除消息,這里的刪除僅僅是設置標志位,不是真正刪除,不影響消息總長度。語法:XDEL key ID [ID ...]
XLEN - 獲取流包含的元素數量,即消息長度。語法:XLEN key
XRANGE - 獲取消息列表,會自動過濾已經刪除的消息
XRANGE key start end [COUNT count]
key :隊列名
start :開始值, - 表示最小值
end :結束值, + 表示最大值
count :數量
XREVRANGE - 反向獲取消息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取消息列表
xread count 3 streams yyj_stream 0-0 :從頭讀取yyj_steam的3條記錄
DEL-刪除整個Stream消息列表中的所有消息
消費者組相關命令:
XGROUP CREATE - 創建消費者組。語法:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key :隊列名稱
groupname :組名。
$ : 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略。
如:XGROUP CREATE mystream consumer-group-name 0-0 / $ MKSTREAM(從頭/尾 開始消費),如果key不存在就創建
XREADGROUP GROUP - 讀取消費者組中的消息。語法:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group :消費組名
consumer :消費者名。
count : 讀取數量。
milliseconds : 阻塞毫秒數。
key : 隊列名。
ID : 消息 ID。
如:XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
XACK - 將消息標記為"已處理"
XGROUP SETID - 為消費者組設置新的最后遞送消息ID
XGROUP DELCONSUMER - 刪除消費者
XGROUP DESTROY - 刪除消費者組
XPENDING - 顯示待處理消息的相關信息
XCLAIM - 轉移消息的歸屬權
XINFO - 查看流和消費者組的相關信息;
XINFO GROUPS - 打印消費者組的信息;
XINFO STREAM - 打印流信息
二、與Springboot整合
org.springframework.data.redis.connection和org.springframework.data.redis.stream軟件包提供了對Redis的數據流的核心功能
目前僅Lettuce客戶端支持 Redis Stream,Jedis尚不支持。
注意消息類型為 Map<String,String>
1、追加消息
要發送一條消息 record,可以使用底層的 RedisConnection ,也可使用高級的 StreamOperations,兩個都提供了add方法(xadd指令),該方法接受記錄record和目標流
作為參數。RedisConnection需要原始數據(字節數組),而StreamOperations可以讓任意對象作為記錄
// append message through connection RedisConnection con = … byte[] stream = … ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream); con.xAdd(record); // append message through RedisTemplate RedisTemplate template = … StringRecord record = StreamRecords.string(…).withStreamKey("my-stream"); template.opsForStream().add(record);
RedisUtil:
/** * 向流中追加記錄,若流不存在,則創建 * * @param record 記錄類型為Map<String,String> * @param streamKey * @return 追加消息的RecordId */ public static RecordId xadd(Map<String, String> record, String streamKey) { try { StringRecord stringRecord = StreamRecords.string(record).withStreamKey(streamKey); // 剛追加記錄的記錄ID RecordId recordId = redisTemplate.opsForStream().add(stringRecord); LOGGER.info(recordId.getValue()); return recordId; } catch (Exception e) { LOGGER.error("xadd error:" + e.getMessage(), e); return null; } } /** * 流消息消費確認 * * @param groupName * @param record * @return 成功確認的消息數 */ public static Long xack(String groupName, Record record) { try { return redisTemplate.opsForStream().acknowledge(groupName, record); } catch (Exception e) { LOGGER.error("xack error:" + e.getMessage(), e); return 0L; } }
讀取操作:
/** * 獲取消息列表,會自動過濾已經刪除的消息 * * @param streamKey * @return */ public static List<MapRecord<String, Object, Object>> xrange(String streamKey) { try { return redisTemplate.opsForStream().range(streamKey, Range.<String>from(Range.Bound.inclusive("-")).to(Range.Bound.inclusive("+"))); } catch (Exception e) { LOGGER.error("xrange error:" + e.getMessage(), e); return null; } } /** * 獲取流包含的元素數量,即消息長度 * * @param streamKey * @return */ public static Long xlen(String streamKey) { try { return redisTemplate.opsForStream().size(streamKey); } catch (Exception e) { LOGGER.error("xlen error:" + e.getMessage(), e); return 0L; } }
2、消費消息
對消費者而言,可以消費一個或多個流。Redis Streams 提供read 命令允許從已知流內容內的任意位置(隨機訪問)消費流,並超出流末尾消費新的流記錄
底層的RedisConnection提供了xRead和xReadGroup方法,它們分別映射Redis命令來讀取消息和在消費組內讀取。請注意,可以將多個流用作參數
注意:在連接上調用xRead會導致當前線程在開始等待消息時阻塞。僅當讀取命令超時或收到消息時才釋放線程
要消費流中的消息,有兩種方式:
1)(同步阻塞)在應用程序代碼中輪詢消息
2)(異步)使用 Message Listener Containers(消息訂閱者容器)中的兩種異步接收中的一種(命令式或響應式)。每當消息到達時,容器都會通知調用應用程序代碼。
同步接收
StreamOperations.read(…)方法提供了此功能。在同步接收期間,調用線程可能會阻塞,直到消息可用為止。 StreamReadOptions.block屬性指定接收者在放棄等待消息之前應該等待多長時間
// Read message through RedisTemplate RedisTemplate template = … List<MapRecord<K, HK, HV>> messages = template.streamOps().read(StreamReadOptions.empty().count(2), StreamOffset.latest("my-stream")); List<MapRecord<K, HK, HV>> messages = template.streamOps().read(Consumer.from("my-group", "my-consumer"), StreamReadOptions.empty().count(2), StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
通過 Message Listener Containers (消息訂閱者容器)異步接收
由於其阻塞性質,低級輪詢沒有吸引力,因為它需要每個消費者使用連接和線程管理。為了減輕這個問題,Spring Data提供了message listeners(消息偵聽器),它可以完成所有繁重的工作
Spring Data附帶了兩種針對所用編程模型的實現:
StreamMessageListenerContainer:充當命令式編程模型的消息偵聽器容器。它用於從Redis流中 consumer record(消費記錄)並驅動注入到對應的StreamListener實例中。
StreamReceiver:提供消息偵聽器的反應式變體。它用於將Redis流中的消息作為潛在的無限流使用,並通過Flux發出流消息
StreamMessageListenerContainer 和StreamReceiver 都是負責所有接收消息的線程,並將消息分發到 listener 中進行處理。
StreamMessageListenerContainer
1)定義流消息訂閱者( Stream-Driven POJO (SDP) )充當流消息的接收者,它必須實現 org.springframework.data.redis.stream.StreamListene 接口。如:
public class ProductUpdateStreamListener implements StreamListener<String, MapRecord<String, String, String>> { @Override public void onMessage(MapRecord<String, String, String> message) { RecordId recordId = message.getId(); String stream = message.getStream(); Map<String, String> record = message.getValue(); // 消息處理完畢后,確認該消息,以便從PEL中刪除該消息ID RedisUtil.streamAcknowledge("yyj_group", message); } }
2)創建 message listener container 和注冊 流消息訂閱者。
RedisConfig 中添加下面的配置:
@Bean public StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() // 一次最多拉取5條消息 .batchSize(5) // 拉取消息的超時時間 .pollTimeout(Duration.ofMillis(100)) .build(); // 流消息訂閱者容器 StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, containerOptions); // 使用消費組,ReadOffset.lastConsumed()在消費組最后消耗的消息ID之后讀取。消費組內的消費者名稱需在配置文件中配置 // 需要注意stream和消費組需提前創建好,XGROUP CREATE yyj_stream yyj_group 0-0 MKSTREAM // 要在接收時自動確認消息,請使用receiveAutoAck代替receive // 經驗證一個消費組內的多個消費者名稱可以相同,不會重復消費,解決了集群部署不好區別消費者名稱的問題 streamMessageListenerContainer.receive( Consumer.from("yyj_group", "consumer_01"), StreamOffset.create("yyj_stream", ReadOffset.lastConsumed()), new ProductUpdateStreamListener()); streamMessageListenerContainer.start(); return streamMessageListenerContainer; }
驗證了,消息只能被消費組內的一個消費者監聽到。一個消費組的不同機器可以共用一個消費者名稱。
需要注意消費組需要提前創建好,不然不能消費消息,創建消費組的時候要指定從什么位置開始消費消息。
補充:
ReadOffset 策略
流讀取操作接受讀取偏移量規范,以消費來自給定偏移量的消息。ReadOffset表示讀取偏移量規范。Redis支持三種偏移量,具體取決於您是standalone 消費流還是在消費組中消費流:
ReadOffset.latest() –閱讀最新消息。
ReadOffset.from(…) –在特定消息ID之后閱讀。
ReadOffset.lastConsumed() –在最后消耗的消息ID之后讀取(僅針對消費者組)
在基於消息容器的使用情況下,我們在消費消息時需要前進(或增加)讀取偏移量,前進取決於請求的ReadOffset和消費模式(有/無消費組)。下表說明了容器如何前進ReadOffset:
Read offset | Standalone | Consumer Group |
---|---|---|
Latest |
讀取最新消息 |
讀取最新消息 |
Specific Message Id(指定messageId) |
使用上次看到的消息作為下一個MessageId |
使用上次看到的消息作為下一個MessageId |
Last Consumed |
使用上次看到的消息作為下一個MessageId |
每個消費者組最后消費的消息 |
總結:Redis Stream可以看作是一個輕量級的Kafka,適用於消息量少且對消息可靠性不高的業務。
END.