要點:
RedisMessageListenerContainer Redis訂閱發布的監聽容器,你的消息發布、訂閱配置都必須在這里面實現
* addMessageListener(MessageListenerAdapter,PatternTopic) 新增訂閱頻道及訂閱者,訂閱者必須有相關方法處理收到的消息。
* setTopicSerializer(RedisSerializer) 對頻道內容進行序列化解析
MessageListenerAdapter 監聽適配器
- MessageListenerAdapter(Object , defaultListenerMethod) 訂閱者及其方法
redisTemplate redis模版類
- convertAndSend(String channel, Object message) 消息發布
第一種:
RedisConfig核心類,實現了Redis連接,訂閱以及發布配置
package com.example.demo.config; import com.example.demo.project.MessageReceive1; import com.example.demo.project.MessageReceive2; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @author lzg * @date 2019/12/5 15:37 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替換默認的jdkSerializeable序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 設置value的序列化規則和 key的序列化規則 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * redis消息監聽器容器 * 可以添加多個監聽不同話題的redis監聽器,只需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器 * 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理 * @param redisConnectionFactory * @param listenerAdapter1 * @return */ //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化) @Bean public RedisMessageListenerContainer container1(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter1, MessageListenerAdapter listenerAdapter2) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); // 訂閱多個頻道 redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test1")); redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test2")); //不同的訂閱者 redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2")); //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); redisMessageListenerContainer.setTopicSerializer(seria); return redisMessageListenerContainer; } //表示監聽一個頻道 @Bean public MessageListenerAdapter listenerAdapter1(MessageReceive1 messageReceive1) { //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceive1 ” return new MessageListenerAdapter(messageReceive1, "getMessage"); } //表示監聽一個頻道 @Bean public MessageListenerAdapter listenerAdapter2(MessageReceive2 messageReceive2) { //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceive2 ” return new MessageListenerAdapter(messageReceive2, "getMessage"); } }
被消費的對象(即傳輸的數據)
/** * @author lzg * @date 2019/12/5 15:46 */ @Data public class Person implements Serializable { private final long serialVersionUID = 1L; private String id; private String userName; private String memberName; private String password; private String email; private String status; private String pwdSalt; }
客戶端:
@Component public class MessageReceive1 { public void getMessage(String object) { //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); Person user = (Person) seria.deserialize(object.getBytes()); System.out.println("消息客戶端1號:" + user.toString()); } }
@Component public class MessageReceive2 { public void getMessage(String object) { //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); Person user = (Person) seria.deserialize(object.getBytes()); System.out.println("消息客戶端2號:" + user); } }
測試類:
@RunWith(SpringRunner.class) @SpringBootTest public class TestPack { @Resource private RedisTemplate redisTemplate; @Test public void test() { Person person1 = new Person(); person1.setId("001"); person1.setUserName("一號"); Person person2 = new Person(); person2.setId("002"); person2.setUserName("二號"); redisTemplate.convertAndSend("test1", person1); redisTemplate.convertAndSend("test2", person2); } }
第二種(簡單版):
配置類:
@Configuration public class MyRedisConf { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替換默認的jdkSerializeable序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 設置value的序列化規則和 key的序列化規則 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("test1")); return container; } /** * 綁定消息監聽者和接收監聽的方法,必須要注入這個監聽器,不然會報錯 */ @Bean public MessageListenerAdapter listenerAdapter() { return new MessageListenerAdapter(new Receiver(), "receiveMessage"); } } @Slf4j class Receiver { public void receiveMessage(String message) { System.out.println(message); } }
測試類:
@RunWith(SpringRunner.class) @SpringBootTest public class TestPack { @Resource private RedisTemplate redisTemplate; @Test public void test(){ for (int i = 0; i < 10; i++) { System.out.println(i); redisTemplate.convertAndSend("test1","這是我發送的第"+i+"個消息"); } } }