SpringBoot結合Redis Stream 實現消息隊列Demo


SpringBoot 中使用Redis Stream 實現消息監聽

Demo環境

  • JDK8
  • Maven3.6.3
  • springboot2.4.3

POM依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>vip.huhailong</groupId>
	<artifactId>redismq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>redismq</name>
	<description>base redis stream mq</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

這里是一個簡單的Demo,所以關於redis的一些序列化配置就省略了。

配置監聽消息類

配置監聽消息類,這里類需要實現StreamListener接口,該接口下只有一個要實現的方法——onMessage方法,代碼:

package vip.huhailong.redismq.redistool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

/**
 * @author Huhailong
 * @Description 監聽消息
 * @Date 2021/3/10.
 */
@Slf4j
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        log.info("接受到來自redis的消息");
        System.out.println("message id "+entries.getId());
        System.out.println("stream "+entries.getStream());
        System.out.println("body "+entries.getValue());
    }
    
}

配置完該類后我們再創建一個類將該監聽器注入進去,代碼:

package vip.huhailong.redismq.config;

import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import vip.huhailong.redismq.redistool.ListenerMessage;

import java.time.Duration;

/**
 * @author Huhailong
 * @Description
 * @Date 2021/3/12.
 */
@Configuration
public class RedisStreamConfig {

    @Autowired
    private ListenerMessage streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory factory){
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        var listenerContainer = StreamMessageListenerContainer.create(factory,options);
        var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"),
                StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener);
        listenerContainer.start();
        return subscription;
    }
}

代碼分析:

  1. 首先將我們實現了StreamListener的監聽器類注入。

  2. subscription方法返回的是一個Subscription類型,它是與當前正在運行任務的鏈接,可以理解為訂閱的鏈接,它有倆個方法

    1. boolean await(Duration timeout): 當訂閱變為活動或超時時,同步阻塞呼叫將返回
    2. boolean isActive(): 如果當前正在訂閱則為true
  3. 代碼中的var是使用了Lombok的可變局部變量。主要是為了方便

  4. StreamMessageListenerContainer: 消息偵聽容器,不能在外部實現。創建后,StreamMessageListenerContainer可以訂閱Redis流並使用傳入的消息。 StreamMessageListenerContainer允許多個流讀取請求,並為每個讀取請求返回一個Subscription句柄。取消訂閱最終將終止后台輪詢。使用鍵和值序列化器轉換消息以支持各種序列化策略。具體文檔

  5. __StreamMessageListenerContainerOptions: __ 它是上面4的選項,代碼中pollTimeout表示輪詢超時時間

  6. create方法使用給定的RedisConnectionFactory和上面的配置選項創建偵聽器。

  7. 接下來使用receiveAutoAck創建一個新訂閱,注意這里接受到消息后會被自動的確認,如果不想自動確認請使用其他的創建訂閱方式。該方法共有三個參數

    1. 消費組 consumer group ,它不能為null (Consumer類型)
    2. stream offset ,stream的偏移量(StreamOffset 類型)
    3. listener 不能為null (StreamListener<K,V> 類型)

    代碼中表示消費者來自名稱為mygroup的組,消費者名稱為huhailong,這里偏移量設置為了lastConsumed,它表示讀取ID大於消費者組使用的最后一個元素的所有新到達的元素。

現在就可以運行項目來驗證了,將項目運行起來后通過終端給對應key的stream添加一條消息

> XADD mystream * message springboot

這時可以看到spring boot的控制台打印除了一下消息:

message id 1615532778588-0
stream mystream
body {message=springboot}

說明偵聽成功,它會一直處於監聽狀態,只要對應key的stream添加了新的消息都會被偵聽到,到此也就簡單的實現了消息隊列功能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM