SpringBoot中RedisTemplate訂閱發布對象


解說

  1. RedisMessageListenerContainer Redis訂閱發布的監聽容器,你的消息發布、訂閱配置都必須在這里面實現
    * addMessageListener(MessageListenerAdapter,PatternTopic) 新增訂閱頻道及訂閱者,訂閱者必須有相關方法處理收到的消息。
    * setTopicSerializer(RedisSerializer) 對頻道內容進行序列化解析
  1. MessageListenerAdapter 監聽適配器
    • MessageListenerAdapter(Object , defaultListenerMethod) 訂閱者及其方法
  2. redisTemplate redis模版類
    • convertAndSend(String channel, Object message) 消息發布

問題

  1. 多人同時訂閱一個頻道 有沒有更好的實現方式?
    2、一個頻道內容廣播,有沒更好的實現方式?

代碼

RedisConfig

核心類,實現了Redis連接,訂閱以及發布配置

package com.mengxiangxiang.newtech.redisMQ; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String,Object> redisTemplate(JedisConnectionFactory redisConnectionFactory){

    RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory);

    //自定義序列化方式
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

    redisTemplate.setKeySerializer(jackson2JsonRedisSerializer);
    redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
    redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.afterPropertiesSet();

    return redisTemplate;
}

/**
 * redis消息監聽器容器
 * 可以添加多個監聽不同話題的redis監聽器,只需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
 * 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
 * @param connectionFactory
 * @param listenerAdapter
 * @return
 */
//MessageListenerAdapter 表示監聽頻道的不同訂閱者
@Bean
RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter2,MessageListenerAdapter listenerAdapter){
     RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //訂閱多個頻道
    container.addMessageListener(listenerAdapter2,new PatternTopic("fullDataUpload"));
    container.addMessageListener(listenerAdapter2,new PatternTopic("analysis"));
    container.addMessageListener(listenerAdapter,new PatternTopic("fullDataUpload"));

    //序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
    Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    seria.setObjectMapper(objectMapper);

    container.setTopicSerializer(seria);
    return container;
}

 //表示監聽一個頻道
@Bean
MessageListenerAdapter listenerAdapter(MessageReceiveTwo receiver){
    //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceiveTwo ”
    return new MessageListenerAdapter(receiver,"getMessage");
}

@Bean
MessageListenerAdapter listenerAdapter2(MessageReceiveOne receiver){
    //這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“MessageReceiveOne ”
    return new MessageListenerAdapter(receiver,"getMessage");
}

}

被消費的對象(即傳輸的數據)

User

package com.mengxiangxiang.newtech.hello.entity;

public class User {
private String name;
private String phone;

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

public String getPhone() {
    return phone;
}

public void setPhone(String phone) {
    this.phone = phone;
}

}

消費者

MessageReceiveOne

第一個消費者
獲取到內容后,是圓是扁隨便搓。

package com.mengxiangxiang.newtech.redisMQ;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.feinno.newtech.hello.entity.User;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiveOne {
public void getMessage(String object){
//序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);

    User user = (User)seria.deserialize(object.getBytes());
    System.out.println("消息客戶端2號:"+object);
}

}

###MessageReceiveTwo >第二個消費者 package com.feinno.newtech.redisMQ;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mengxiangxiang.newtech.hello.entity.User;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiveTwo {
public void getMessage(String object){
//序列化對象(特別注意:發布的時候需要設置序列化;訂閱方也需要設置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);

    User user = (User)seria.deserialize(object.getBytes());
    System.out.println("消息客戶端2號:"+object);
}

}

發布消息

@Test public void testMQ(){ String channel1 = "fullDataUpload"; String channel2 = "analysis";
    User user = new User();
    user.setPhone("18675830623");
    user.setName("劉大");

    User user2 = new User();
    user2.setPhone("17856232365");
    user2.setName("李二");

    redisTemplate.convertAndSend(channel1,user2);
    redisTemplate.convertAndSend(channel2,user);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM