Spring Boot 整合 Redis消息訂閱與發布


 

一、Redis 中的發布訂閱功能

 

 

 

與 Redis 發布訂閱相關的命令有 6 個,分別如下:

  • PSUBSCRIBE pattern [pattern …]:訂閱一個或者多個符合pattern格式的頻道

  • PUBLISH channel message:發布消息到chanel中

  • PUBSUB subcommand [argument [argument …]]:查看訂閱與發布系統狀態

  • PUNSUBSCRIBE [pattern [pattern …]]:退訂所有符合格式的頻道

  • SUBSCRIBE channel [channel …]:訂閱一個或者多個頻道

  • UNSUBSCRIBE [channel [channel …]]:取消訂閱頻道

 

而在 Jedis 中,也提供了一個類 JedisPubSub,用來對訂閱的 channel 進行監聽。

  • onPMessage:監聽到訂閱模式接受到消息時的回調

  • onMessage:監聽到訂閱頻道接受到消息時的回調

  • onSubscribe:訂閱頻道時的回調

  • onUnsubscribe:取消訂閱頻道時的回調

  • onPSubscribe:訂閱頻道模式時的回調

  • onPUnsubscribe:取消訂閱模式時的回調

 

二、項目具體實現

1. POM依賴包

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.1.0</version>
</dependency>

 

2.消息推送

package com.zyu.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;

/**
 * @Auther: zyu
 * @Date: 2020/4/29 10:18
 * @Description:   發布訂閱 SUB端
 */
@RestController
public class SubController {


    @GetMapping("/sub")
    public String messageSub(String message) {

        Jedis jedis = new Jedis();
        try{
            //發送廣播
            jedis.publish("redisChat", message);
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            jedis.close();
        }
        return "SUCCESS";
    }

}

 

3.消息接收

package com.zyu.task;

import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * @Auther: zyu
 * @Date: 2020/4/29 09:03
 * @Description: Redis消息監聽
 */
@Component
public class Receiver {

    public void receiveMessage() {

        Jedis jedis = new Jedis();
        JedisPubSub jedisPubSub = new JedisPubSub() {

            /**
             * 監聽到訂閱頻道接受到消息時的回調
             * @param channel   頻道
             * @param message   消息
             */
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("Channel:" + channel);
                System.out.println("Message:" + message);
            }

            /**
             * 監聽到訂閱模式接受到消息時的回調
             * @param pattern
             * @param channel
             * @param message
             */
            @Override
            public void onPMessage(String pattern, String channel, String message) {
                // TODO Auto-generated method stub
                System.out.println(pattern + "," + channel + "," + message);
            }

            /**
             * 訂閱頻道時的回調
             * @param channel
             * @param subscribedChannels
             */
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                // TODO Auto-generated method stub
                System.out.println("onSubscribe: channel[" + channel + "]," + "subscribedChannels[" + subscribedChannels + "]");
            }

            /**
             * 取消訂閱頻道時的回調
             * @param channel
             * @param subscribedChannels
             */
            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                // TODO Auto-generated method stub
                System.out.println(
                        "onUnsubscribe: channel[" + channel + "], " + "subscribedChannels[" + subscribedChannels + "]");
            }

            /**
             * 取消訂閱模式時的回調
             * @param pattern
             * @param subscribedChannels
             */
            @Override
            public void onPUnsubscribe(String pattern, int subscribedChannels) {
                // TODO Auto-generated method stub
                System.out.println("onPUnsubscribe: pattern[" + pattern + "]," +
                        "subscribedChannels[" + subscribedChannels + "]");
            }

            /**
             * 訂閱頻道模式時的回調
             * @param pattern
             * @param subscribedChannels
             */
            @Override
            public void onPSubscribe(String pattern, int subscribedChannels) {
                System.out.println("onPSubscribe: pattern[" + pattern + "], " +
                        "subscribedChannels[" + subscribedChannels + "]");
            }

        };

        // 訂閱一個或者多個符合pattern格式的頻道
        String[] channels = {"redisChat"};
        jedis.subscribe(jedisPubSub, channels);
    }

}

 

4.Spring容器初始化時,將監聽類注入

package com.zyu;

import com.zyu.task.Receiver;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class SpringbootRedisApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(SpringbootRedisApplication.class, args);
        run.getBean(Receiver.class).receiveMessage();
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM