SpringBoot中RedisTemplate订阅发布对象


解说

  1. RedisMessageListenerContainer Redis订阅发布的监听容器,你的消息发布、订阅配置都必须在这里面实现
    • addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
    • setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析
  1. MessageListenerAdapter 监听适配器
    • MessageListenerAdapter(Object , defaultListenerMethod) 订阅者及其方法
  2. redisTemplate redis模版类
    • convertAndSend(String channel, Object message) 消息发布

问题

  1. 多人同时订阅一个频道 有没有更好的实现方式?
    2、一个频道内容广播,有没更好的实现方式?

    代码

    RedisConfig

    核心类,实现了Redis连接,订阅以及发布配置


package com.mengxiangxiang.newtech.redisMQ;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

@Configuration
public class RedisConfig {

@Bean public RedisTemplate<String,Object> redisTemplate(JedisConnectionFactory redisConnectionFactory){ RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); //自定义序列化方式 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); redisTemplate.setKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param listenerAdapter * @return */ //MessageListenerAdapter 表示监听频道的不同订阅者 @Bean RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter2,MessageListenerAdapter listenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅多个频道 container.addMessageListener(listenerAdapter2,new PatternTopic("fullDataUpload")); container.addMessageListener(listenerAdapter2,new PatternTopic("analysis")); container.addMessageListener(listenerAdapter,new PatternTopic("fullDataUpload")); //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); container.setTopicSerializer(seria); return container; } //表示监听一个频道 @Bean MessageListenerAdapter listenerAdapter(MessageReceiveTwo receiver){ //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceiveTwo ” return new MessageListenerAdapter(receiver,"getMessage"); } @Bean MessageListenerAdapter listenerAdapter2(MessageReceiveOne receiver){ //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceiveOne ” return new MessageListenerAdapter(receiver,"getMessage"); }

}

 

被消费的对象(即传输的数据)

User


package com.mengxiangxiang.newtech.hello.entity;

public class User {
private String name;
private String phone;

public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; }

}

消费者

MessageReceiveOne

第一个消费者
获取到内容后,是圆是扁随便搓。


package com.mengxiangxiang.newtech.redisMQ;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.feinno.newtech.hello.entity.User;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiveOne {
public void getMessage(String object){
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);

    User user = (User)seria.deserialize(object.getBytes()); System.out.println("消息客户端2号:"+object); }

}

 

MessageReceiveTwo

第二个消费者

package com.feinno.newtech.redisMQ;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mengxiangxiang.newtech.hello.entity.User;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiveTwo {
public void getMessage(String object){
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);

    User user = (User)seria.deserialize(object.getBytes()); System.out.println("消息客户端2号:"+object); }

}

发布消息


@Test
public void testMQ(){
String channel1 = "fullDataUpload";
String channel2 = "analysis";

    User user = new User(); user.setPhone("18675830623"); user.setName("刘大"); User user2 = new User(); user2.setPhone("17856232365"); user2.setName("李二"); redisTemplate.convertAndSend(channel1,user2); redisTemplate.convertAndSend(channel2,user); }

 




免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM