Spring Data Redis Stream的使用


一、背景

Stream類型是 redis5之后新增的類型,在這篇文章中,我們實現使用Spring boot data redis來消費Redis Stream中的數據。實現獨立消費和消費組消費。

二、整合步驟

1、引入jar包

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
</dependencies>

主要是上方的這個包,其他的不相關的包此處省略導入。

2、配置RedisTemplate依賴

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 這個地方不可使用 json 序列化,如果使用的是ObjectRecord傳輸對象時,可能會有問題,會出現一個 java.lang.IllegalArgumentException: Value must not be null! 錯誤
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }
}

注意:

此處需要注意 setHashValueSerializer 的序列化的方式,具體注意事項后期再說。

3、准備一個實體對象

這個實體對象是需要發送到Stream中的對象。

@Getter
@Setter
@ToString
public class Book {
    private String title;
    private String author;
    
    public static Book create() {
        com.github.javafaker.Book fakerBook = Faker.instance().book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        return book;
    }
}

每次調用create方法時,會自動產生一個Book的對象,對象模擬數據是使用javafaker來模擬生成的。

4、編寫一個常量類,配置Stream的名稱

/**
 * 常量
 *
 */
public class Cosntants {
    
    public static final String STREAM_KEY_001 = "stream-001";
    
}

5、編寫一個生產者,向Stream中生產數據

1、編寫一個生產者,向Stream中產生ObjectRecord類型的數據

/**
 * 消息生產者
 
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    public void sendRecord(String streamKey) {
        Book book = Book.create();
        log.info("產生一本書的信息:[{}]", book);
        
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());
        
        RecordId recordId = redisTemplate.opsForStream()
                .add(record);
        
        log.info("返回的record-id:[{}]", recordId);
    }
}

2、每隔5s就生產一個數據到Stream中

/**
 * 周期性的向流中產生消息
 */
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
    
    private final StreamProducer streamProducer;
    
    @Override
    public void run(ApplicationArguments args) {
        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0, 5, TimeUnit.SECONDS);
    }
}

三、獨立消費

獨立消費指的是脫離消費組的直接消費Stream中的消息,是使用 xread方法讀取流中的數據,流中的數據在讀取后並不會被刪除,還是存在的。如果多個程序同時使用xread讀取,都是可以讀取到消息的。

1、實現從頭開始消費-xread實現

此處實現的是從Stream的第一個消息開始消費

package com.huan.study.redis.stream.consumer.xread;

import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 脫離消費組-直接消費Stream中的數據,可以獲取到Stream中所有的消息
 */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
    
    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    
    private volatile boolean stop = false;
    
    @Override
    public void afterPropertiesSet() {
        
        // 初始化線程池
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });
        
        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // 如果沒有數據,則阻塞1s 阻塞時間需要小於`spring.redis.timeout`配置的時間
                .block(Duration.ofMillis(1000))
                // 一直阻塞直到獲取數據,可能會報超時異常
                // .block(Duration.ofMillis(0))
                // 1次獲取10個數據
                .count(10);
        
        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {
            while (!stop) {
                // 使用xread讀取數據時,需要記錄下最后一次讀取到offset,然后當作下次讀取的offset,否則讀取出來的數據會有問題
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
                if (CollectionUtils.isEmpty(objectRecords)) {
                    log.warn("沒有獲取到數據");
                    continue;
                }
                for (ObjectRecord<String, Book> objectRecord : objectRecords) {
                    log.info("獲取到的數據信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readOffset.setLength(0);
                    readOffset.append(objectRecord.getId());
                }
            }
        });
    }
    
    @Override
    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    }
}

注意:

下一次讀取數據時,offset 是上一次最后獲取到的id的值,否則可能會出現漏數據。

2、StreamMessageListenerContainer實現獨立消費

見下方的消費組消費的代碼

四、消費組消費

1、實現StreamListener接口

實現這個接口的目的是為了,消費Stream中的數據。需要注意在注冊時使用的是streamMessageListenerContainer.receiveAutoAck()還是streamMessageListenerContainer.receive()方法,如果是第二個,則需要手動ack,手動ack的代碼:redisTemplate.opsForStream().acknowledge("key","group","recordId");

/**
 * 通過監聽器異步消費
 *
 * @author huan.fu 2021/11/10 - 下午5:51
 */
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
    /**
     * 消費者類型:獨立消費、消費組消費
     */
    private String consumerType;
    /**
     * 消費組
     */
    private String group;
    /**
     * 消費組中的某個消費者
     */
    private String consumerName;
    
    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }
    
    private RedisTemplate<String, Object> redisTemplate;
    
    @Override
    public void onMessage(ObjectRecord<String, Book> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {
            log.info("[{}]: 接收到一個消息 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
        } else {
            log.info("[{}] group:[{}] consumerName:[{}] 接收到一個消息 stream:[{}],id:[{}],value:[{}]", consumerType,
                    group, consumerName, stream, id, value);
        }
        
        // 當是消費組消費時,如果不是自動ack,則需要在這個地方手動ack
        // redisTemplate.opsForStream()
        //         .acknowledge("key","group","recordId");
    }
}

2、獲取消費或消費消息過程中錯誤的處理

/**
 * StreamPollTask 獲取消息或對應的listener消費消息過程中發生了異常
 *
 * @author huan.fu 2021/11/11 - 下午3:44
 */
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        log.error("發生了異常", t);
    }
}

3、消費組配置

/**
 * redis stream 消費組配置
 *
 * @author huan.fu 2021/11/11 - 下午12:22
 */
@Configuration
public class RedisStreamConfiguration {
    
    @Resource
    private RedisConnectionFactory redisConnectionFactory;
    
    /**
     * 可以同時支持 獨立消費 和 消費者組 消費
     * <p>
     * 可以支持動態的 增加和刪除 消費者
     * <p>
     * 消費組需要預先創建出來
     *
     * @return StreamMessageListenerContainer
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多獲取多少條消息
                        .batchSize(10)
                        // 運行 Stream 的 poll task
                        .executor(executor)
                        // 可以理解為 Stream Key 的序列化方式
                        .keySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 后方的字段的 key 的序列化方式
                        .hashKeySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 后方的字段的 value 的序列化方式
                        .hashValueSerializer(RedisSerializer.string())
                        // Stream 中沒有消息時,阻塞多長時間,需要比 `spring.redis.timeout` 的時間小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 時,將 對象的 filed 和 value 轉換成一個 Map 比如:將Book對象轉換成map
                        .objectMapper(new ObjectHashMapper())
                        // 獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發生了異常的處理
                        .errorHandler(new CustomErrorHandler())
                        // 將發送到Stream中的Record轉換成ObjectRecord,轉換成具體的類型是這個地方指定的類型
                        .targetType(Book.class)
                        .build();
        
        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        // 獨立消費
        String streamKey = Cosntants.STREAM_KEY_001;
        streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
                new AsyncConsumeStreamListener("獨立消費", null, null));
        
        // 消費組A,不自動ack
        // 從消費組中沒有分配給消費者的消息開始消費
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費", "group-a", "consumer-a"));
        // 從消費組中沒有分配給消費者的消息開始消費
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費A", "group-a", "consumer-b"));
        
        // 消費組B,自動ack
        streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費B", "group-b", "consumer-bb"));
        
        // 如果需要對某個消費者進行個性化配置在調用register方法的時候傳遞`StreamReadRequest`對象
        
        return streamMessageListenerContainer;
    }
}

注意:

提前建立好消費組

127.0.0.1:6379> xgroup create stream-001 group-a $
OK
127.0.0.1:6379> xgroup create stream-001 group-b $
OK

1、獨有消費配置

 streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), new AsyncConsumeStreamListener("獨立消費", null, null));

不傳遞Consumer即可。

2、配置消費組-不自動ack消息

streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("消費組消費A", "group-a", "consumer-b"));

1、需要注意ReadOffset的取值。

2、需要注意group需要提前創建好。

3、配置消費組-自動ack消息

streamMessageListenerContainer.receiveAutoAck()

五、序列化策略

Stream Property Serializer Description
key keySerializer used for Record#getStream()
field hashKeySerializer used for each map key in the payload
value hashValueSerializer used for each map value in the payload

六、ReadOffset策略

消費消息時的Read Offset 策略

ReadOffset策略

Read offset Standalone Consumer Group
Latest Read latest message(讀取最新的消息) Read latest message(讀取最新的消息)
Specific Message Id Use last seen message as the next MessageId
(讀取大於指定的消息id的消息)
Use last seen message as the next MessageId
(讀取大於指定的消息id的消息)
Last Consumed Use last seen message as the next MessageId
(讀取大於指定的消息id的消息)
Last consumed message as per consumer group
(讀取還未分配給消費組中的消費組的消息)

七、注意事項

1、讀取消息的超時時間

當我們使用 StreamReadOptions.empty().block(Duration.ofMillis(1000)) 配置阻塞時間時,這個配置的阻塞時間必須要比 spring.redis.timeout配置的時間短,否則可能會報超時異常。

2、ObjectRecord反序列化錯誤

如果我們在讀取消息時發生如下異常,那么排查思路如下:

java.lang.IllegalArgumentException: Value must not be null!
	at org.springframework.util.Assert.notNull(Assert.java:201)
	at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
	at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
	at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
	at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
	at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

1、檢測 RedisTemplateHashValueSerializer的序列化方式,最好不要使用json可以使用RedisSerializer.string()

2、檢查redisTemplate.opsForStream()中配置的HashMapper,默認是ObjectHashMapper這個是把對象字段和值序列化成byte[]格式。

提供一個可用的配置

# RedisTemplate的hash value 使用string類型的序列化方式
redisTemplate.setHashValueSerializer(RedisSerializer.string());
# 這個方法opsForStream()里面使用默認的ObjectHashMapper
redisTemplate.opsForStream()

關於上面的這個錯誤,我在Spring Data Redis的官方倉庫提了一個 issue,得到官方的回復是,這是一個bug,后期會修復的。

官方回答

3、使用xread順序讀取數據漏數據

如果我們使用xread讀取數據發現有寫數據漏掉了,這個時候我們需要檢查第二次讀取時配置的StreamOffset是否合法,這個值需要是上一次讀取的最后一個值。

舉例說明:

1、SteamOffset傳遞的是 $ 表示讀取最新的一個數據。

2、處理上一步讀取到的數據,此時另外的生產者又向Stream中插入了幾個數據,這個時候讀取到的數據還沒有處理完。

3、再次讀取Stream中的數據,還是傳遞的$,那么表示還是讀取最新的數據。那么在上一步流入到Stream中的數據,這個消費者就讀取不到了,因為它讀取的是最新的數據。

4、StreamMessageListenerContainer的使用

1、可以動態的添加和刪除消費者

2、可以進行消費組消費

3、可以直接獨立消費

4、如果傳輸ObjectRecord的時候,需要注意一下序列化方式。參考上面的代碼。

八、完整代碼

https://gitee.com/huan1993/spring-cloud-parent/tree/master/redis/redis-stream

九、參考文檔

1、https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams
2、https://github.com/spring-projects/spring-data-redis/issues/2198


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM