Springboot通過redisTemplate實現發布訂閱


要點:

RedisMessageListenerContainer Redis訂閱發布的監聽容器,你的消息發布、訂閱配置都必須在這里面實現
* addMessageListener(MessageListenerAdapter,PatternTopic) 新增訂閱頻道及訂閱者,訂閱者必須有相關方法處理收到的消息。
* setTopicSerializer(RedisSerializer) 對頻道內容進行序列化解析

MessageListenerAdapter 監聽適配器

  • MessageListenerAdapter(Object , defaultListenerMethod) 訂閱者及其方法

redisTemplate redis模版類

  • convertAndSend(String channel, Object message) 消息發布

第一種:

RedisConfig核心類,實現了Redis連接,訂閱以及發布配置

package com.example.demo.config;

import com.example.demo.project.MessageReceive1;
import com.example.demo.project.MessageReceive2;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author lzg
 * @date 2019/12/5 15:37
 */
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替換默認的jdkSerializeable序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 設置value的序列化規則和 key的序列化規則
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /**
     * redis消息監聽器容器
     * 可以添加多個監聽不同話題的redis監聽器,只需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
     * 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
     * @param redisConnectionFactory
     * @param listenerAdapter1
     * @return
     */
    //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
    @Bean
    public RedisMessageListenerContainer container1(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter1, MessageListenerAdapter listenerAdapter2) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        // 訂閱多個頻道
        redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test1"));
        redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test2"));
        //不同的訂閱者
        redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2"));

        //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        redisMessageListenerContainer.setTopicSerializer(seria);
        return redisMessageListenerContainer;
    }


    //表示監聽一個頻道
    @Bean
    public MessageListenerAdapter listenerAdapter1(MessageReceive1 messageReceive1) {
        //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceive1 ”
        return new MessageListenerAdapter(messageReceive1, "getMessage");
    }

    //表示監聽一個頻道
    @Bean
    public MessageListenerAdapter listenerAdapter2(MessageReceive2 messageReceive2) {
        //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceive2 ”
        return new MessageListenerAdapter(messageReceive2, "getMessage");
    }
}

被消費的對象(即傳輸的數據)

/**
 * @author lzg
 * @date 2019/12/5 15:46
 */
@Data
public class Person implements Serializable {
    private final long serialVersionUID = 1L;

    private String id;

    private String userName;

    private String memberName;

    private String password;

    private String email;

    private String status;

    private String pwdSalt;

}

客戶端:

@Component
public class MessageReceive1 {
    public void getMessage(String object) {
//序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        Person user = (Person) seria.deserialize(object.getBytes());
        System.out.println("消息客戶端1號:" + user.toString());
    }
}
@Component
public class MessageReceive2 {
    public void getMessage(String object) {
//序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        Person user = (Person) seria.deserialize(object.getBytes());
        System.out.println("消息客戶端2號:" + user);
    }
}

測試類:

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestPack {
    @Resource
    private RedisTemplate redisTemplate;
    @Test
    public void test() {
        Person person1 = new Person();
        person1.setId("001");
        person1.setUserName("一號");
        Person person2 = new Person();
        person2.setId("002");
        person2.setUserName("二號");
        redisTemplate.convertAndSend("test1", person1);
        redisTemplate.convertAndSend("test2", person2);
    }
}

第二種(簡單版):

配置類:

@Configuration
public class MyRedisConf {
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替換默認的jdkSerializeable序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 設置value的序列化規則和 key的序列化規則
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("test1"));
        return container;
    }

    /**
     * 綁定消息監聽者和接收監聽的方法,必須要注入這個監聽器,不然會報錯
     */
    @Bean
    public MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(new Receiver(), "receiveMessage");
    }
}

@Slf4j
class Receiver {
    public void receiveMessage(String message) {
        System.out.println(message);
    }
}

測試類:

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestPack {
    @Resource
    private RedisTemplate redisTemplate;
    @Test
    public void test(){
        for (int i = 0; i < 10; i++) {
            System.out.println(i);
            redisTemplate.convertAndSend("test1","這是我發送的第"+i+"個消息");
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM