Springboot+RabbitMQ实战demo


 

rabbitmq依赖包

<!-- rabbitmq依赖 begin -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- rabbitmq依赖 end -->

 

 

1、application.yml配置文件

spring:
  # rabbitmq配置
  rabbitmq:
    host: amqp.cn-shenzhen-429403-a.aliyuncs.com
    port: 5672
    username: lynch
    password: lynch
    virtual-host: order-vhost
    
    ##RabbitMQ生产端核心配置
    connection-timeout: 5000
    publisher-confirms: true
    publisher-returns: true
    
    ##RabbitMQ消费端核心配置
    listener:
       simple:
          #设置消费端手动 ack
          acknowledge-mode: manual
          #消费者最小数量
          concurrency: 5
          #消费之最大数量
          max-concurrency: 10
          
    template:
      #ReturnCallback的时候代表消息不可达,设置 broker不自动删除该消息,而是返回到生产端,让我们进行一些后续的处理
      mandatory: true
      # 启用重试机制,重试间隔时间为2s,最多重试3次
      retry:
        enabled: true
        initial-interval: 2s
        max-attempts: 3

 

 

2、RabbitMQConfig.java——RabbitMQ配置类

package io.geekidea.springbootplus.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan({"com.xukang.*", "io.geekidea.*"}) public class RabbitMQConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 设置序列化策略
 rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }

 

3、生产端单元测试

package com.xukang.common; import java.util.Date; import java.util.UUID; import org.junit.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.xukang.order.entity.Order; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import io.geekidea.springbootplus.BaseTest; import io.geekidea.springbootplus.config.constant.DatePattern; import lombok.extern.slf4j.Slf4j; @Slf4j public class RabbitmqTest extends BaseTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test() { System.out.println(rabbitTemplate); } /** * 发送direct模式消息 * * @throws Exception */ @Test public void testDirectQueue() throws Exception { Order order = new Order(); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一
        String id = UUID.randomUUID().toString(); log.info(String.format("sendOrder id: %s", id)); CorrelationData correlationData = new CorrelationData(id); order.setId(IdWorker.getId()); order.setOrderSn("order-1"); Date date = new Date(); System.out.println(DateUtil.format(date, DatePattern.YYYY_MM_DD_HH_MM_SS)); order.setCreateTime(date); //rabbitTemplate.convertAndSend("amq.direct", "order", order, correlationData); // 执行发送消息到指定队列
        rabbitTemplate.convertAndSend("amq.direct", "order", order, message -> { // 设置延迟,单位:毫秒值
            message.getMessageProperties().setHeader("delay", 20000); return message; }, correlationData); } /** * 发送topic模式消息 * * @throws Exception */ @Test public void testTopicQueue() throws Exception { Order order = new Order(); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一
        String id = UUID.randomUUID().toString(); log.info(String.format("sendOrder id: %s", id)); CorrelationData correlationData = new CorrelationData(id); order.setId(IdWorker.getId()); order.setOrderSn("order001"); rabbitTemplate.convertAndSend("amq.topic", "order.1", order, correlationData); order.setId(IdWorker.getId()); order.setOrderSn("order002"); rabbitTemplate.convertAndSend("amq.topic", "order.2", order, correlationData); } //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一标识:{}", correlationData); log.info("确认结果:{}", ack); if(!ack){ log.info("异常处理...."); log.info("失败原因:{}", cause); } } }; //回调函数: return返回 //启动消息失败返回,比如路由不到队列时触发回调
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体 message : "+message); log.info("消息主体 message : "+replyCode); log.info("描述:"+replyText); log.info("消息使用的交换器 exchange : "+exchange); log.info("消息使用的路由键 routing : "+routingKey); } }; }

 

 

 

4、direct模式消费端代码

package com.xukang.foobar.mq; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import com.xukang.order.entity.Order; /** * direct模式消费端 * * @author PC * */ @Component public class DirectReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"), exchange = @Exchange(value = "amq.direct", durable = "true", type = "direct", ignoreDeclarationExceptions = "true"), key = "order")) @RabbitHandler public void onMessage(Message<Order> message, Channel channel) throws Exception { Order order = message.getPayload(); System.out.println("消费端Payload: " + order); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); // 手工ACK
        channel.basicAck(deliveryTag, false); } }

 

5、topic模式消费端代码

package com.xukang.foobar.mq; import java.io.IOException; import java.util.List; import java.util.Map; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.xukang.order.entity.Order; /** * topic模式消费端 * * @author PC * */ @Component public class TopicReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic-queue", durable = "true"), exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "order.*")) @RabbitHandler public void onMessage(Message<Order> message, Channel channel) throws Exception { Long deliveryTag = 0L; try { deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); Order order = message.getPayload(); System.out.println("消费端Payload: " + order); if(order.getOrderSn().equals("order001")) { //丢弃这条消息
                channel.basicNack(deliveryTag, false, true); return; } // 手工ACK
            channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息
            channel.basicNack(deliveryTag, false, true); } } }

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM