今天來學習如何利用Spring Data對Redis的支持來實現消息的發布訂閱機制。發布訂閱是一種典型的異步通信模型,可以讓消息的發布者和訂閱者充分解耦。在我們的例子中,我們將使用StringRedisTemplate
來發布一個字符串消息,同時基於MessageListenerAdapter
使用一個POJO來訂閱和響應該消息。
提示
事實上,RedisRedis 不僅提供一個NoSQL數據庫,同時提供了一套消息系統。
環境准備
開發環境:
- IDE+Java環境(JDK 1.7或以上版本)
- Maven 3.0+(Eclipse和Idea IntelliJ內置,如果使用IDE並且不使用命令行工具可以不安裝)
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tianmaing</groupId>
<artifactId>redis-message</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>redis-message</name>
<description>Demo of message processing by redis</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.2.5.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
通過配置spring-boot-starter-redis依賴,把Spring Boot對Redis的相關支持引入進來。
創建Redis消息的接收者
在任何一個基於消息的應用中,都有消息發布者和消息接收者(或者稱為消息訂閱者)。創建消息的接收者,我們只需一個普通POJO,在POJO中定義一個接收消息的方法即可:
package com.tianmaying.springboot.redisdemo;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch;
@Autowired
public Receiver(CountDownLatch latch) {
this.latch = latch;
}
public void receiveMessage(String message) {
LOGGER.info("Received <" + message + ">");
latch.countDown();
}
}
這個Receiver
類將會被注冊為一個消息監聽者時。處理消息的方法我們可以任意命名,我們有相當大的靈活性。
我們給Receiver
的構造函數通過@AutoWired
標注注入了一個CountDownLatch
實例,當接收到消息時,調用cutDown()
方法。
注冊監聽者和發送消息
Spring Data Redis提供基於Redis發送和接收消息的所有需要的組件,我們只需要配置好三個東西:
- 一個連接工廠(connection factory)
- 一個消息監聽者容器(message listener container)
- 一個Redis的模板(redis template)
我們將通過Redis模板來發送消息,同時將Receiver
注冊給消息監聽者容器。連接工廠將兩者連接起來,使得它們可以通過Redis服務器通信。如何連接呢? 我們將連接工廠實例分別注入到監聽者容器和Redis模板中即可。
package com.tianmaying.springboot.redisdemo;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@SpringBootApplication
public class App {
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
Receiver receiver(CountDownLatch latch) {
return new Receiver(latch);
}
@Bean
CountDownLatch latch() {
return new CountDownLatch(1);
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
public static void main(String[] args) throws InterruptedException {
ApplicationContext ctx = SpringApplication.run(App.class, args);
StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
CountDownLatch latch = ctx.getBean(CountDownLatch.class);
LOGGER.info("Sending message...");
template.convertAndSend("chat", "Hello from Redis!");
latch.await();
System.exit(0);
}
}
連接工程我們使用Spring Boot默認的RedisConnectionFactory
,是Jedis Redis庫提供的JedisConnectionFactory
實現。
我們將在listenerAdapter
方法中定義的Bean注冊為一個消息監聽者,它將監聽chat
主題的消息。
因為Receiver
類是一個POJO,要將它包裝在一個消息監聽者適配器(實現了MessageListener
接口),這樣才能被監聽者容器RedisMessageListenerContainer
的addMessageListener方法添加到連接工廠中。有了這個適配器,當一個消息到達時,就會調用
receiveMesage()`方法進行響應。
就這么簡單,配置好連接工廠和消息監聽者容器,你就可以監聽消息啦!
發送消息就更簡單了,我們使用StringRedisTemplate
來發送鍵和值均為字符串的消息。在main()
方法中我們創建一個Spring應用的Context,初始化消息監聽者容器,開始監聽消息。然后獲取StringRedisTemplate
的實例,往chat主題發送一個消息。我們看到,消息可以被成功的接收到並打印出來,搞定!