SpringBoot整合Redis實現發布訂閱功能實踐


該項目代碼下載

一、項目結構

我首先用 SpringBoot Initializer 創建一個簡單的 Demo,然后在 Demo 上進行修改,這樣更便捷。項目結構如下圖所示:

項目結構也很簡單

  • PrintMessageListener 負責處理訂閱消息,我僅僅是打印了收到的Redis信息;
  • AdminController 負責從瀏覽器輸入url,實現動態訂閱/取消訂閱以及發布;
  • RedisConfiguration 可能是最重要的,需要負責向 Spring容器注入以下 Bean:
    • RedisTemplate :可以通過調用它的 convertAndSend(channel, Object message) 方法 發布消息;
    • RedisMessageListenerContainer ,可以通過調用它的 addMessageListener(MessageListener listener, Topic topic) 方法 訂閱消息;相反地,也可以調用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消訂閱消息;
  • PubsubApplication 是 SpringBoot 的啟動類;
  • logback.xml 配置內容可以參考 這篇文章

PS:作為 Maven 項目,肯定還要有 pom.xml,圖片中沒有反映出來,所以我補充一下。

二、Maven 依賴

項目需要引入的依賴包括:

  • spring-boot-starter-web:幫助我們啟動一個Web服務器;
  • spring-boot-starter-data-redis:幫助我們集成Redis;
  • lombok:方便我們使用 @Slf4j/@Data 等,簡化代碼;
  • slf4j-api:讓我們能夠使用 LoggerLoggerFactory 等類;
  • logback-classic:讓我們能夠真正打印出日志。

完整的 pom.xml 文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.3</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <groupId>com.example.demo</groupId>
  <artifactId>pubsub</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>pubsub</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
    <slf4j.version>1.7.32</slf4j.version>
    <logback.version>1.2.6</logback.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>${logback.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
	<configuration>
	  <excludes>
	    <exclude>
	      <groupId>org.projectlombok</groupId>
	      <artifactId>lombok</artifactId>
            </exclude>
	  </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

三、消息監聽

我們收到發布的消息后,需要處理邏輯,這部分邏輯寫在 PrintMessageListener 中:

package com.example.demo.pubsub.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 功能描述:打印收到的Redis信息
 *
 * @author geekziyu
 * @version 1.0.0
 */
@Slf4j
public class PrintMessageListener implements MessageListener {

    private StringRedisSerializer stringRedisSerializer;

    public PrintMessageListener(StringRedisSerializer stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = stringRedisSerializer.deserialize(message.getChannel());
        String body = stringRedisSerializer.deserialize(message.getBody());
        handleMessage(channel, body);
    }

    private void handleMessage(String channel, String body) {
        log.info("消費Redis消息\n channel:{}\n body:{}", channel, body);
    }
}

四、Redis配置

前面也說過了,我們要使用 spring-boot-starter-data-redis 中提供的API實現Redis發布和訂閱消息,就需要用到 RedisTemplateRedisMessageListenerContainer,現在就來把他們注入Spring容器:

package com.example.demo.pubsub.config;

import com.example.demo.pubsub.listener.PrintMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 功能描述:Redis 配置
 *
 * @author geekziyu
 * @version 1.0.0
 */
@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer result = new RedisMessageListenerContainer();
        result.setConnectionFactory(redisConnectionFactory);

        return result;
    }

    @Bean("redisTemplate")
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, String> result = new RedisTemplate<>();
        result.setConnectionFactory(factory);

        result.setKeySerializer(stringRedisSerializer());
        result.setHashKeySerializer(stringRedisSerializer());

        result.setValueSerializer(stringRedisSerializer());
        result.setHashValueSerializer(stringRedisSerializer());
        return result;
    }

    @Bean
    public PrintMessageListener printMessageListener() {
        return new PrintMessageListener(stringRedisSerializer());
    }

    @Bean
    public StringRedisSerializer stringRedisSerializer() {
        return new StringRedisSerializer();
    }
}

需要注意的有以下幾點:
第一、如果不調用 setConnectionFactory(RedisConnectionFactory),給 RedisMessageListenerContainer 設置連接工廠,在調用 addMessageListener 執行訂閱時,會出現空指針異常,具體發生異常的位置如下圖:

第二、如果不調用 RedisTemplatesetConnectionFactory 方法設置Redis連接工廠,會在啟動時就發生異常,如下圖所示:

// 說明 RedisConnectionFactory 對於 RedisTemplate 而言是必需的!
Caused by: java.lang.IllegalStateException: RedisConnectionFactory is required
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.data.redis.core.RedisAccessor.afterPropertiesSet(RedisAccessor.java:38)
	at org.springframework.data.redis.core.RedisTemplate.afterPropertiesSet(RedisTemplate.java:128)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)

五、通過HTTP請求訂閱發布

我這里用 AdminController 來接受發布和訂閱/取消訂閱的請求,源代碼如下:

package com.example.demo.pubsub.controller;

import com.example.demo.pubsub.listener.PrintMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * 功能描述:后台控制器
 *
 * @author geekziyu
 * @version 1.0.0
 */
@RestController
@RequestMapping("/admin")
public class AdminController {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private RedisMessageListenerContainer container;

    private Map<String, MessageListener> registeredListener = new HashMap<>();

    @Autowired
    private StringRedisSerializer stringRedisSerializer;


    @GetMapping("/pub")
    public String publish(String channel, String body) {
        redisTemplate.convertAndSend(channel, body);
        return "ok";
    }

    @GetMapping("/sub")
    public String subscribe(String channel) {
        MessageListener listener = registeredListener.computeIfAbsent(channel, ch -> new PrintMessageListener(stringRedisSerializer));
        container.addMessageListener(listener, new ChannelTopic(channel));
        return "ok";
    }

    @GetMapping("/unsub")
    public String unsubscribe(String channel) {
        MessageListener messageListener = registeredListener.get(channel);
        if (messageListener != null) {
            container.removeMessageListener(messageListener, new ChannelTopic(channel));
        }
        return "ok";
    }

}

六、打印日志

為了順利的在控制台輸出日志,你可能需要 logback.xml 的完整代碼:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="info">
    <appender-ref ref="STDOUT"/>
  </root>
</configuration>

小節

這樣,我們就已經可以實現發布訂閱了。

首先訂閱一下:

http://localhost:8080/admin/sub?channel=dream

再發布一下:

http://localhost:8080/admin/pub?channel=dream&body=engineer

檢查控制台,Redis消息消費成功:

需要注意,你的 application.properties 中Redis的連接默認為 localhost:6379

spring.redis.host=localhost
spring.redis.port=6379

你需要確保本地已經啟動了Redis,且服務端口是6379。如果你不熟悉如何搭建Redis,那么你需要修改 Redis 連接到一個可用的 Redis 服務上去。

參考文檔

SpringBoot整合Redis實現消息發布訂閱


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM