補習系列(13)-springboot redis 與發布訂閱


一、訂閱發布

訂閱發布是一種常見的設計模式,常見於消息系統的場景。
如下面的圖:

[圖來自百科]
消息發布者是消息載體的生產者,其通過某些主題來向調度中心發送消息;
而消息訂閱者會事先向調度中心訂閱其"感興趣"的主題,隨后會獲得新消息。
在這里,調度中心是一個負責消息控制中轉的邏輯實體,可以是消息隊列如ActiveMQ,也可以是Web服務等等。

常見應用

  • 微博,每個用戶的粉絲都是該用戶的訂閱者,當用戶發完微博,所有粉絲都將收到他的動態;
  • 新聞,資訊站點通常有多個頻道,每個頻道就是一個主題,用戶可以通過主題來做訂閱(如RSS),這樣當新聞發布時,訂閱者可以獲得更新。

二、Redis 與訂閱發布

Redis 支持 (pub/sub) 的訂閱發布能力,客戶端可以通過channel(頻道)來實現消息的發布及接收。

  1. 客戶端通過 SUBSCRIBE 命令訂閱 channel;

  1. 客戶端通過PUBLISH 命令向channel 發送消息;

而后,訂閱 channel的客戶端可實時收到消息。

除了簡單的SUBSCRIBE/PUBLISH命令之外,Redis還支持訂閱某一個模式的主題(正則表達式),
如下:

PSUBSCRIBE  /topic/cars/*

於是,我們可以利用這點實現相對復雜的訂閱能力,比如:

  • 在電商平台中訂閱多個品類的商品促銷信息;
  • 智能家居場景,APP可以訂閱所有房間的設備消息。
    ...

盡管如此,Redis pub/sub 機制存在一些缺點:

  • 消息無法持久化,存在丟失風險;
  • 沒有類似 RabbitMQ的ACK機制;
  • 由於是廣播機制,無法通過添加worker 提升消費能力;

因此,Redis 的訂閱發布建議用於實時且可靠性要求不高的場景。

三、SpringBoot 與訂閱發布

接下來,看一下SpringBoot 怎么實現訂閱發布的功能。

spring-boot-starter-data-redis 幫我們實現了Jedis的引入,pom 依賴如下:

 <!-- redis -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
   <version>${spring-boot.version}</version>
  </dependency>

application.properties 中指定配置

# redis 連接配置
spring.redis.database=0 
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false

# 連接池最大數
spring.redis.pool.max-active=10 
# 空閑連接最大數
spring.redis.pool.max-idle=10
# 獲取連接最大等待時間(s)
spring.redis.pool.max-wait=600000

A. 消息模型

消息模型描述了訂閱發布的數據對象,這要求生產者與消費者都能理解
以下面的POJO為例:

    public static class SimpleMessage {

        private String publisher;
        private String content;
        private Date createTime;

在SimpleMessage類中,我們聲明了幾個字段:

字段名 說明
publisher 發布者
content 文本內容
createTime 創建時間

B. 序列化

如下的代碼采用了JSON 作為序列化方式:

@Configuration
public class RedisConfig {

    private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class);

    /**
     * 序列化定制
     * 
     * @return
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
                Object.class);

        // 初始化objectmapper
        ObjectMapper mapper = new ObjectMapper();
        mapper.setSerializationInclusion(Include.NON_NULL);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);
        return jackson2JsonRedisSerializer;
    }

    /**
     * 操作模板
     * 
     * @param connectionFactory
     * @param jackson2JsonRedisSerializer
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory,
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {

        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(connectionFactory);

        // 設置key/hashkey序列化
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);

        // 設置值序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }

C. 發布消息

消息發布,需要先指定一個ChannelTopic對象,隨后通過RedisTemplate方法操作。

@Service
public class RedisPubSub {  
    private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class);

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private ChannelTopic topic = new ChannelTopic("/redis/pubsub");

  
    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    private void schedule() {
        logger.info("publish message");
        publish("admin", "hey you must go now!");
    }

    /**
     * 推送消息
     * 
     * @param publisher
     * @param message
     */
    public void publish(String publisher, String content) {
        logger.info("message send {} by {}", content, publisher);

        SimpleMessage pushMsg = new SimpleMessage();
        pushMsg.setContent(content);
        pushMsg.setCreateTime(new Date());
        pushMsg.setPublisher(publisher);

        redisTemplate.convertAndSend(topic.getTopic(), pushMsg);
    }

上述代碼使用一個定時器(@Schedule)來做發布,為了保證運行需要在主類中啟用定時器注解:

@EnableScheduling
@SpringBootApplication
public class BootSampleRedis{
...
}

D. 接收消息

定義一個消息接收處理的Bean:

    @Component
    public static class MessageSubscriber {

        public void onMessage(SimpleMessage message, String pattern) {
            logger.info("topic {} received {} ", pattern, JsonUtil.toJson(message));
        }
    }

接下來,利用 MessageListenerAdapter 可將消息通知到Bean方法:

       /**
         * 消息監聽器,使用MessageAdapter可實現自動化解碼及方法代理
         * 
         * @return
         */
        @Bean
        public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer,
                MessageSubscriber subscriber) {
            MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
            adapter.setSerializer(jackson2JsonRedisSerializer);
            adapter.afterPropertiesSet();
            return adapter;
        }

最后,關聯到消息發布的Topic:

        /**
         * 將訂閱器綁定到容器
         * 
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                MessageListenerAdapter listener) {

            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(listener, new PatternTopic("/redis/*"));
            return container;
        }

運行結果
啟動程序,從控制台可輸出:

.RedisPubSub : publish message
.RedisPubSub : message send hey you must go now! by admin
.RedisPubSub : topic /redis/* received {"publisher":"admin","content":"hey you must go now!","createTime":1543418694007} 

這樣,我們便完成了訂閱發布功能。

示例程序下載

小結

消息訂閱發布是分布式系統中的常用手段,也經常用來實現系統解耦、性能優化等目的;
當前小節結合SpringBoot 演示了 Redis訂閱發布(pub/sub)的實現,在部分場景下可以參考使用。
歡迎繼續關注"美碼師的補習系列-springboot篇" ,期待更多精彩內容-


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM