解說
- RedisMessageListenerContainer Redis訂閱發布的監聽容器,你的消息發布、訂閱配置都必須在這里面實現
- addMessageListener(MessageListenerAdapter,PatternTopic) 新增訂閱頻道及訂閱者,訂閱者必須有相關方法處理收到的消息。
- setTopicSerializer(RedisSerializer) 對頻道內容進行序列化解析
- MessageListenerAdapter 監聽適配器
- MessageListenerAdapter(Object , defaultListenerMethod) 訂閱者及其方法
- redisTemplate redis模版類
- convertAndSend(String channel, Object message) 消息發布
問題
-
多人同時訂閱一個頻道 有沒有更好的實現方式?
2、一個頻道內容廣播,有沒更好的實現方式?代碼
RedisConfig
核心類,實現了Redis連接,訂閱以及發布配置
package com.mengxiangxiang.newtech.redisMQ;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration
public class RedisConfig {
@Bean public RedisTemplate<String,Object> redisTemplate(JedisConnectionFactory redisConnectionFactory){ RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); //自定義序列化方式 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); redisTemplate.setKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * redis消息監聽器容器 * 可以添加多個監聽不同話題的redis監聽器,只需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器 * 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理 * @param connectionFactory * @param listenerAdapter * @return */ //MessageListenerAdapter 表示監聽頻道的不同訂閱者 @Bean RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter2,MessageListenerAdapter listenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱多個頻道 container.addMessageListener(listenerAdapter2,new PatternTopic("fullDataUpload")); container.addMessageListener(listenerAdapter2,new PatternTopic("analysis")); container.addMessageListener(listenerAdapter,new PatternTopic("fullDataUpload")); //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); container.setTopicSerializer(seria); return container; } //表示監聽一個頻道 @Bean MessageListenerAdapter listenerAdapter(MessageReceiveTwo receiver){ //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceiveTwo ” return new MessageListenerAdapter(receiver,"getMessage"); } @Bean MessageListenerAdapter listenerAdapter2(MessageReceiveOne receiver){ //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceiveOne ” return new MessageListenerAdapter(receiver,"getMessage"); }
}
被消費的對象(即傳輸的數據)
User
package com.mengxiangxiang.newtech.hello.entity;
public class User {
private String name;
private String phone;
public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; }
}
消費者
MessageReceiveOne
第一個消費者
獲取到內容后,是圓是扁隨便搓。
package com.mengxiangxiang.newtech.redisMQ;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.feinno.newtech.hello.entity.User;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiveOne {
public void getMessage(String object){
//序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
User user = (User)seria.deserialize(object.getBytes()); System.out.println("消息客戶端2號:"+object); }
}
MessageReceiveTwo
第二個消費者
package com.feinno.newtech.redisMQ;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mengxiangxiang.newtech.hello.entity.User;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiveTwo {
public void getMessage(String object){
//序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
User user = (User)seria.deserialize(object.getBytes()); System.out.println("消息客戶端2號:"+object); }
}
發布消息
@Test
public void testMQ(){
String channel1 = "fullDataUpload";
String channel2 = "analysis";
User user = new User(); user.setPhone("18675830623"); user.setName("劉大"); User user2 = new User(); user2.setPhone("17856232365"); user2.setName("李二"); redisTemplate.convertAndSend(channel1,user2); redisTemplate.convertAndSend(channel2,user); }
