Redis實現簡單的消息隊列


1         准備工作

先確保代碼中已經集成Redis

2         Redis消息監聽器配置

//序列化定制
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
            Object.class);
    ObjectMapper mapper = new ObjectMapper();
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(mapper);
    return jackson2JsonRedisSerializer;
}
/**
 * redis消息監聽器
 */
@Bean
public MessageListenerAdapter lister(Jackson2JsonRedisSerializer jackson2JsonRedisSerializer, RedisMessageSubscriber subscriber){
    MessageListenerAdapter adapter=new MessageListenerAdapter(subscriber,"onMessage");
    adapter.setSerializer(jackson2JsonRedisSerializer);
    adapter.afterPropertiesSet();
    return adapter;
}

/**
 * 將訂閱器綁定到容器
 * @param connectionFactory
 * @param listener
 * @return
 */
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, new PatternTopic("/redis/*"));
    container.setTaskExecutor(Executors.newFixedThreadPool(20));//避免創建線程過多
    return container;
}

  

3   Redis消息發布推送

/**
 * @author xbchen
 * @date 2020-3-2 13:58:33
 * @description Reis消息發布推送
 */
@Service
public class RedisMessagePublish {
    private final Logger logger = LoggerFactory.getLogger(RedisMessagePublish.class);

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private ChannelTopic topic = new ChannelTopic("/redis/pubsub");

    /**
     * 推送消息
     *
     * @param publisher
     * @param content
     */
    public void publish(String publisher, String content) {
        logger.info("{}發布Redis消息=====>{}", publisher, content);
        redisTemplate.convertAndSend(topic.getTopic(), content);
    }
}

  

4 Redis消息接收

/**
 * @author xbchen
 * @date 2020-3-2 13:58:33
 * @description 模擬消息接收類
 */
@Component
public class RedisMessageSubscriber {
    private final Logger logger = LoggerFactory.getLogger(RedisMessageSubscriber.class);

    public void onMessage(String message, String pattern) {
        logger.info("接收到Redis消息=====>:topic {} ;message {} ", pattern, message);
    }
}

5  測試

@Scheduled(cron = "0/10 * * * * ? ")
    public void testResitMessageTask() {
        publishService.publish("admin", "redis消息訂閱發布測試!");
    }

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM