Redis Stream


官方介紹:https://redis.io/topics/streams-intro

Springboot整合文檔:https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/#redis.streams

 

 

一、Stream

  Stream是Redis 5.0新增的一種數據結構。它是一個新的很強大的支持多播的可持久化消息隊列(極大借鑒了Kafka的設計)。

  Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄 

  Redis Stream 提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,並且能記住每一個客戶端的訪問位置,還能保證消息不丟失

 

 Stream結構與特征

  它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。

 

 

 

  每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用 xadd指令追加消息時自動創建。

  名詞

  Consumer Group:消費組,使用 XGROUP CREATE 命令創建,一個消費組有多個消費者(Consumer)

  last_delivered_id :游標,每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。

  pending_ids :Pending Entries List (PEL),Stream在每個消費者結構內部維護了一個狀態變量pending_ids ,它記錄了當前已經被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)的消息的ID。

 

  每個Stream都可以掛多個消費組(Consumer Group),每個消費組會有個游標 last_delivered_id 在Stream 之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令 xgroup create 進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化 last_delivered_id 變量。

  每個消費組的狀態都是獨立的,相互不受影響,也就是說同一份Stream 內部的消息會被每個消費組都消費到

  同一個消費組可以掛接多個消費者(Consumer),這些消費組之間是競爭關系(一個消息只會被消費組內的一個消費者消費),任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動,每個消費者有一個組內唯一名稱。

 

  消息ID

    消息ID的形式是timestamp-sequence,毫秒數-序號,表示當前消息產生時的毫秒數,sequence表示當前毫秒數產生的第幾個消息

  消息內容

    消息內容就是鍵值對

 

 

 Stream消息太多怎么辦?

  Stream如果消息太多,導致消息鏈表很長,占用內存很大,怎么辦?

  Redis提供了一個定長Stream功能,通過XADD命令的MAXLEN選項或者XTRIM命令,限制Stream的長度,當達到限制的長度時,就會將老的消息干掉,從而使Stream保持恆定的長度。

    1)XADD命令的MAXLEN選項

    2)XTRIM命令(推薦)

      定長策略:MAXLEN,如限制Stream的長度為10:XTRIM mystream MAXLEN 10,或者 XTRIM mystream MAXLEN ~ 10,~參數意味着並不需要精確到長度為10,只保證最少為10即可,實際上允許比10稍多一些。

      最小ID策略(>=6.2版本):MINID,該技術可以逐出ID小於指定ID的條目。XTRIM mystream MINID 649085820:所有ID低於649085820-0的條目都將被刪除

 

  

 Stream相關命令

    XADD - 向Stream追加消息到末尾,如果隊列不存在,則創建一個隊列。語法:

      XADD key ID field value [field value ...]

        key :隊列名稱,如果不存在就創建
        ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
        field value : 記錄


    XTRIM - 對流進行修剪,限制長度,返回從流中刪除的條目數
    XDEL - 從Stream中刪除消息,這里的刪除僅僅是設置標志位,不是真正刪除,不影響消息總長度。語法:XDEL key ID [ID ...]
    XLEN - 獲取流包含的元素數量,即消息長度。語法:XLEN key
    XRANGE - 獲取消息列表,會自動過濾已經刪除的消息

      XRANGE key start end [COUNT count]

        key :隊列名
        start :開始值, - 表示最小值
        end :結束值, + 表示最大值
        count :數量


    XREVRANGE - 反向獲取消息列表,ID 從大到小
    XREAD - 以阻塞或非阻塞方式獲取消息列表

      xread count 3 streams yyj_stream 0-0 :從頭讀取yyj_steam的3條記錄

    DEL-刪除整個Stream消息列表中的所有消息

  
  消費者組相關命令:

    XGROUP CREATE - 創建消費者組。語法:

      XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

        key :隊列名稱
        groupname :組名。
        $ : 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略。

      如:XGROUP CREATE mystream consumer-group-name 0-0 / $ MKSTREAM(從頭/尾 開始消費),如果key不存在就創建


    XREADGROUP GROUP - 讀取消費者組中的消息。語法:

      XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

        group :消費組名
        consumer :消費者名。
        count : 讀取數量。
        milliseconds : 阻塞毫秒數。
        key : 隊列名。
        ID : 消息 ID。

      如:XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

      
    XACK - 將消息標記為"已處理"
    XGROUP SETID - 為消費者組設置新的最后遞送消息ID
    XGROUP DELCONSUMER - 刪除消費者
    XGROUP DESTROY - 刪除消費者組
    XPENDING - 顯示待處理消息的相關信息
    XCLAIM - 轉移消息的歸屬權
    XINFO - 查看流和消費者組的相關信息;
    XINFO GROUPS - 打印消費者組的信息;
    XINFO STREAM - 打印流信息

 

 

二、與Springboot整合

  org.springframework.data.redis.connectionorg.springframework.data.redis.stream軟件包提供了對Redis的數據流的核心功能

  目前僅Lettuce客戶端支持 Redis Stream,Jedis尚不支持。

  注意消息類型為 Map<String,String> 

  

  1、追加消息

    要發送一條消息 record,可以使用底層的 RedisConnection ,也可使用高級的 StreamOperations,兩個都提供了add方法(xadd指令),該方法接受記錄record和目標流

  作為參數。RedisConnection需要原始數據(字節數組),而StreamOperations可以讓任意對象作為記錄

// append message through connection
RedisConnection con =byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);

  RedisUtil:

    /**
     * 向流中追加記錄,若流不存在,則創建
     *
     * @param record 記錄類型為Map<String,String>
     * @param streamKey
     * @return 追加消息的RecordId
     */
    public static RecordId xadd(Map<String, String> record, String streamKey) {
        try {
            StringRecord stringRecord = StreamRecords.string(record).withStreamKey(streamKey);
            // 剛追加記錄的記錄ID
            RecordId recordId = redisTemplate.opsForStream().add(stringRecord);
            LOGGER.info(recordId.getValue());
            return recordId;
        } catch (Exception e) {
            LOGGER.error("xadd error:" + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 流消息消費確認
     *
     * @param groupName
     * @param record
     * @return 成功確認的消息數
     */
    public static Long xack(String groupName, Record record) {
        try {
            return redisTemplate.opsForStream().acknowledge(groupName, record);
        } catch (Exception e) {
            LOGGER.error("xack error:" + e.getMessage(), e);
            return 0L;
        }
    }

 讀取操作:

    /**
     * 獲取消息列表,會自動過濾已經刪除的消息
     *
     * @param streamKey
     * @return
     */
    public static List<MapRecord<String, Object, Object>> xrange(String streamKey) {
        try {
            return redisTemplate.opsForStream().range(streamKey, Range.<String>from(Range.Bound.inclusive("-")).to(Range.Bound.inclusive("+")));
        } catch (Exception e) {
            LOGGER.error("xrange error:" + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 獲取流包含的元素數量,即消息長度
     *
     * @param streamKey
     * @return
     */
    public static Long xlen(String streamKey) {
        try {
            return redisTemplate.opsForStream().size(streamKey);
        } catch (Exception e) {
            LOGGER.error("xlen error:" + e.getMessage(), e);
            return 0L;
        }
    }

 

 

  2、消費消息

    對消費者而言,可以消費一個或多個流。Redis Streams 提供read 命令允許從已知流內容內的任意位置(隨機訪問)消費流,並超出流末尾消費新的流記錄

    底層的RedisConnection提供了xReadxReadGroup方法,它們分別映射Redis命令來讀取消息和在消費組內讀取。請注意,可以將多個流用作參數

    注意:在連接上調用xRead會導致當前線程在開始等待消息時阻塞。僅當讀取命令超時或收到消息時才釋放線程

    要消費流中的消息,有兩種方式:

      1)(同步阻塞)在應用程序代碼中輪詢消息

      2)(異步)使用 Message Listener Containers(消息訂閱者容器)中的兩種異步接收中的一種(命令式或響應式)。每當消息到達時,容器都會通知調用應用程序代碼。

     

    同步接收

      StreamOperations.read(…)方法提供了此功能。在同步接收期間,調用線程可能會阻塞,直到消息可用為止。 StreamReadOptions.block屬性指定接收者在放棄等待消息之前應該等待多長時間 

// Read message through RedisTemplate
RedisTemplate template = …

List<MapRecord<K, HK, HV>> messages = template.streamOps().read(StreamReadOptions.empty().count(2),
                StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.streamOps().read(Consumer.from("my-group", "my-consumer"),
                StreamReadOptions.empty().count(2),
                StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

 

    通過 Message Listener Containers (消息訂閱者容器)異步接收

      由於其阻塞性質,低級輪詢沒有吸引力,因為它需要每個消費者使用連接和線程管理。為了減輕這個問題,Spring Data提供了message listeners(消息偵聽器),它可以完成所有繁重的工作

    Spring Data附帶了兩種針對所用編程模型的實現:

      StreamMessageListenerContainer:充當命令式編程模型的消息偵聽器容器。它用於從Redis流中 consumer record(消費記錄)並驅動注入到對應的StreamListener實例中。

      StreamReceiver:提供消息偵聽器的反應式變體。它用於將Redis流中的消息作為潛在的無限流使用,並通過Flux發出流消息

    StreamMessageListenerContainer 和StreamReceiver 都是負責所有接收消息的線程,並將消息分發到 listener 中進行處理。

  

    StreamMessageListenerContainer 

    1)定義流消息訂閱者( Stream-Driven POJO (SDP) )充當流消息的接收者,它必須實現 org.springframework.data.redis.stream.StreamListene 接口。如:

public class ProductUpdateStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        RecordId recordId = message.getId();
        String stream = message.getStream();
        Map<String, String> record = message.getValue();

        // 消息處理完畢后,確認該消息,以便從PEL中刪除該消息ID
        RedisUtil.streamAcknowledge("yyj_group", message);
    }
}

 

    2)創建 message listener container 和注冊 流消息訂閱者。

    RedisConfig 中添加下面的配置:

@Bean
    public StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                // 一次最多拉取5條消息
                .batchSize(5)
                // 拉取消息的超時時間
                .pollTimeout(Duration.ofMillis(100))
                .build();

        // 流消息訂閱者容器
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory,
                containerOptions);
        // 使用消費組,ReadOffset.lastConsumed()在消費組最后消耗的消息ID之后讀取。消費組內的消費者名稱需在配置文件中配置
        // 需要注意stream和消費組需提前創建好,XGROUP CREATE yyj_stream yyj_group 0-0 MKSTREAM
        // 要在接收時自動確認消息,請使用receiveAutoAck代替receive
        // 經驗證一個消費組內的多個消費者名稱可以相同,不會重復消費,解決了集群部署不好區別消費者名稱的問題
        streamMessageListenerContainer.receive(
                Consumer.from("yyj_group", "consumer_01"),
                StreamOffset.create("yyj_stream", ReadOffset.lastConsumed()),
                new ProductUpdateStreamListener());
        streamMessageListenerContainer.start();
        return streamMessageListenerContainer;
    }

 

    驗證了,消息只能被消費組內的一個消費者監聽到。一個消費組的不同機器可以共用一個消費者名稱。

    需要注意消費組需要提前創建好,不然不能消費消息,創建消費組的時候要指定從什么位置開始消費消息。

 

 

  補充:

  ReadOffset 策略

    流讀取操作接受讀取偏移量規范,以消費來自給定偏移量的消息。ReadOffset表示讀取偏移量規范。Redis支持三種偏移量,具體取決於您是standalone 消費流還是在消費組中消費流:

      ReadOffset.latest() –閱讀最新消息。

      ReadOffset.from(…) –在特定消息ID之后閱讀。

      ReadOffset.lastConsumed() –在最后消耗的消息ID之后讀取(僅針對消費者組)

 

    在基於消息容器的使用情況下,我們在消費消息時需要前進(或增加)讀取偏移量,前進取決於請求的ReadOffset和消費模式(有/無消費組)。下表說明了容器如何前進ReadOffset:

Read offset Standalone Consumer Group

Latest

讀取最新消息

讀取最新消息

Specific Message Id(指定messageId)

使用上次看到的消息作為下一個MessageId

使用上次看到的消息作為下一個MessageId

Last Consumed

使用上次看到的消息作為下一個MessageId

每個消費者組最后消費的消息

 

 

總結:Redis Stream可以看作是一個輕量級的Kafka,適用於消息量少且對消息可靠性不高的業務。

 

END.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM