一:redis中發布訂閱功能(http://www.redis.cn/commands.html#pubsub)
-
PSUBSCRIBE pattern [pattern …]:訂閱一個或者多個符合pattern格式的頻道
-
PUBLISH channel message:發布消息到chanel中
-
PUBSUB subcommand [argument [argument …]]:查看訂閱與發布系統狀態
-
PUNSUBSCRIBE [pattern [pattern …]]:退訂所有符合格式的頻道
-
SUBSCRIBE channel [channel …]:訂閱一個或者多個頻道
-
UNSUBSCRIBE [channel [channel …]]:取消訂閱頻道
二:實戰使用reids中的發布訂閱模式解決部署在阿里服務與本地后台服務的接口調用不通問題(跨多服務器)
當然也可使用更為強大的消息中間件(RabbitMQ、ActiveMQ、RocketMQ、Kafka、ZeroMQ)
1:redis使用到的maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<!-- springboot1.5版本使用jedis,2.0以上版本使用lettuce,本項目使用jedis,所以需要排除lettuce -->
<exclusions>
<exclusion>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</exclusion>
<!-- <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> -->
</exclusions>
</dependency>
2:application.yml配置
redis:
host: localhost
port: 8379
password: 123456
database: 2
timeout: 50s
# 如果使用的jedis 則將lettuce改成jedis即可
jedis:
pool:
max-active: 8
max-idle: 8
min-idle: 0
3:publisher發布者發布消息
/** * 建立發布者,通過頻道發布消息 * @param key 發布者 * @param value 消息 */
public void publish(String key,Object value){ this.redisTemplate.convertAndSend(key,value); }
redisUtils.publish(RedisTopicEnums.TOPIC_DISCOVER.getTopic(),message);
4:第一種實現方法
/** * redis消息監聽器容器 * @param connectionFactory * @param healthyListenerAdapter 健康掃描消息訂閱處理器 * @param settingsListenerAdapter 配置健康掃描消息訂閱處理器 * @return
*/ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter healthyListenerAdapter, MessageListenerAdapter settingsListenerAdapter ) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //設備健康掃描綁定消息訂閱處理器
container.addMessageListener(healthyListenerAdapter, new PatternTopic("healthy_topic")); //設備配置掃描並綁定消息訂閱處理器
container.addMessageListener(settingsListenerAdapter, new PatternTopic("settings_topic")); return container; } /** * 設備健康消息訂閱處理器,並指定處理方法(利用反射的機制調用消息處理器的業務方法) * @param receiver * @return
*/ @Bean MessageListenerAdapter healthyListenerAdapter(ReceiverRedisMessage receiver) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "healthy"); return messageListenerAdapter; } /** * 設備健康消息訂閱處理器,並指定處理方法 * @param receiver * @return
*/ @Bean MessageListenerAdapter settingsListenerAdapter(ReceiverRedisMessage receiver) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "settings"); return messageListenerAdapter; }
缺點:1:考慮實際運用中可能會訂閱多個主題,每加一個主題(Topic)都需要使用container.addMessageListener(listenerAdapter,
new
PatternTopic(
"topic"
));不合理
2:考慮到后期盡量不該變原有代碼進行擴展,推薦使用下面第二種方式實現(保證開閉原則)
5:第二種實現方法:定義訂閱者接收消息器接口
/** * 訂閱者接收消息的基類 * @author : ywb * @createdDate : 2020/8/6 * @updatedDate */
public interface Subscriber extends MessageListener { /** * 類型 * @return
*/
default String getType() { return this.getClass().getSimpleName(); } /** * 通道名稱 * @return
*/ String getTopic(); }
6:定義不同主題枚舉類型,后期增加一個管道,增加一個枚舉信息即可
/** * 定義不同主題類型 * @author : ywb * @createdDate : 2020/8/7 * @updatedDate */
public enum RedisTopicEnums { /** * redis主題名稱定義 需要與發布者一致 * */ TOPIC_DISCOVERY("topic:discovery", "設備發現變更Topic"), TOPIC_HEALTHY("topic:healthy", "健康掃描的設備Topic"), TOPIC_SETTINGS("topic:settings", "配置掃描變更的設備Topic"), TOPIC_DISCOVER("topic:discover", "發現設備Topic"), ; /** * 主題名稱 */
private String topic; /** * 描述 */
private String description; RedisTopicEnums(String topic, String description) { this.topic = topic; this.description = description; } public String getTopic() { return topic; } public String getDescription() { return description; } }
7:實現多個訂閱者,后續增加一個訂閱者,只需要多加上一個訂閱者類,從而不用改動redis消息 監聽容器配置
7.1:設備健康掃描訂閱者
/** * 設備健康掃描的訂閱者 * * @author : ywb * @createdDate : 2020/8/7 * @updatedDate */ @Component @Slf4j public class HealthySubscriber implements Subscriber { @Autowired private DeviceService deviceService; @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public String getTopic() { return RedisTopicEnums.TOPIC_HEALTHY.getTopic(); } @Override public void onMessage(Message message, byte[] pattern) { String deviceIds = (String) redisTemplate.getValueSerializer().deserialize(message.getBody()); log.info(">> 訂閱消息,設備健康異常編號:{}", deviceIds); // TODO 這里是收到通道的消息之后執行的方法
String[] split = deviceIds.split(","); Map<String, Set<Integer>> idsMap = TokenSplit.getDeviceIdRegex(split); for (Map.Entry<String, Set<Integer>> stringSetEntry : idsMap.entrySet()) { DeviceHandle healthyHandle = new DeviceHealthyHandle(); healthyHandle.respondHandle(stringSetEntry.getValue()); } } }
7.2:配置掃描訂閱者
/** * 設備配置變更訂閱者 * * @author : ywb * @createdDate : 2020/8/7 * @updatedDate */ @Component @Slf4j public class SettingsSubscriber implements Subscriber { @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public String getTopic() { return RedisTopicEnums.TOPIC_SETTINGS.getTopic(); } @Override public void onMessage(Message message, byte[] pattern) { //使用redis convertAndSend發布消息,訂閱者獲取字符串字節必須要反序列
String deviceIds = (String) redisTemplate.getValueSerializer().deserialize(message.getBody()); log.info(">>訂閱消息,設備配置變更編號:{}", deviceIds); // TODO 這里是收到通道的消息之后執行的方法
String[] split = deviceIds.split(","); Map<String, Set<Integer>> idsMap = TokenSplit.getDeviceIdRegex(split); for (Map.Entry<String, Set<Integer>> stringSetEntry : idsMap.entrySet()) { DeviceScannerHandle scannerHandle = new DeviceScannerHandle(); scannerHandle.respondHandle(stringSetEntry.getValue()); } } }
8:redisConfig配置,消息監聽器容器配置
@Configuration public class RedisConfig { /** * 自定義 redisTemplate<String, Object> * * @param redisConnectionFactory * @return
*/ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); ObjectMapper om = new ObjectMapper(); // 指定要序列化的域,field,get和set,以及修飾符范圍,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // 指定序列化輸入的類型,類必須是非final修飾的,final修飾的類 // om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); // om.activateDefaultTyping(BasicPolymorphicTypeValidator.builder().build(), ObjectMapper.DefaultTyping.EVERYTHING);
om.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL); om.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); om.setTimeZone(TimeZone.getTimeZone("GMT+8")); // 不轉換值為 null 的對象 // om.setSerializationInclusion(JsonInclude.Include.NON_NULL);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); jackson2JsonRedisSerializer.setObjectMapper(om); // key 采用 string 的序列化方式
template.setKeySerializer(new StringRedisSerializer()); // value 采用 jackson 的序列化方式
template.setValueSerializer(jackson2JsonRedisSerializer); // hash 的 key 采用 string 的序列化方式
template.setHashKeySerializer(new StringRedisSerializer()); // hash 的 value 采用 jackson 的序列化方式
template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } /** * DependencyDescriptor * 重點 * 首先判斷注入的類型,如果是數組、Collection、Map,則注入的是元素數據,即查找與元素類型相同的Bean,注入到集合中。 * 強調下Map類型,Map的 key 為Bean的 name,value 為 與定義的元素類型相同的Bean。 *將所有相同類型(實現了同一個接口)的Bean,一次性注入到集合類型中,具體實現查看spring源碼 * * 獲取Subscriptor接口所有的實現類 * 注入所有實現了接口的Bean * 將所有的配置消息接收處理類注入進來,那么消息接收處理類里面的注解對象也會注入進來 */ @Autowired private transient List<Subscriber> subscriptorList; @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { //創建一個消息監聽對象
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); //將監聽對象放入到容器中
container.setConnectionFactory(connectionFactory); if (this.subscriptorList != null && this.subscriptorList.size() > 0) { for (Subscriber subscriber : this.subscriptorList) { if (subscriber == null || StringUtils.isBlank(subscriber.getTopic())) { continue; } //一個訂閱者對應一個主題通道信息
container.addMessageListener(subscriber, new PatternTopic(subscriber.getTopic())); } } return container; }