rabbitmq依赖包
<!-- rabbitmq依赖 begin -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- rabbitmq依赖 end -->
1、application.yml配置文件
spring:
# rabbitmq配置
rabbitmq:
host: amqp.cn-shenzhen-429403-a.aliyuncs.com
port: 5672
username: lynch
password: lynch
virtual-host: order-vhost
##RabbitMQ生产端核心配置
connection-timeout: 5000
publisher-confirms: true
publisher-returns: true
##RabbitMQ消费端核心配置
listener:
simple:
#设置消费端手动 ack
acknowledge-mode: manual
#消费者最小数量
concurrency: 5
#消费之最大数量
max-concurrency: 10
template:
#ReturnCallback的时候代表消息不可达,设置 broker不自动删除该消息,而是返回到生产端,让我们进行一些后续的处理
mandatory: true
# 启用重试机制,重试间隔时间为2s,最多重试3次
retry:
enabled: true
initial-interval: 2s
max-attempts: 3
2、RabbitMQConfig.java——RabbitMQ配置类
package io.geekidea.springbootplus.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan({"com.xukang.*", "io.geekidea.*"}) public class RabbitMQConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 设置序列化策略
rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
3、生产端单元测试
package com.xukang.common; import java.util.Date; import java.util.UUID; import org.junit.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.xukang.order.entity.Order; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import io.geekidea.springbootplus.BaseTest; import io.geekidea.springbootplus.config.constant.DatePattern; import lombok.extern.slf4j.Slf4j; @Slf4j public class RabbitmqTest extends BaseTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test() { System.out.println(rabbitTemplate); } /** * 发送direct模式消息 * * @throws Exception */ @Test public void testDirectQueue() throws Exception { Order order = new Order(); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一
String id = UUID.randomUUID().toString(); log.info(String.format("sendOrder id: %s", id)); CorrelationData correlationData = new CorrelationData(id); order.setId(IdWorker.getId()); order.setOrderSn("order-1"); Date date = new Date(); System.out.println(DateUtil.format(date, DatePattern.YYYY_MM_DD_HH_MM_SS)); order.setCreateTime(date); //rabbitTemplate.convertAndSend("amq.direct", "order", order, correlationData); // 执行发送消息到指定队列
rabbitTemplate.convertAndSend("amq.direct", "order", order, message -> { // 设置延迟,单位:毫秒值
message.getMessageProperties().setHeader("delay", 20000); return message; }, correlationData); } /** * 发送topic模式消息 * * @throws Exception */ @Test public void testTopicQueue() throws Exception { Order order = new Order(); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一
String id = UUID.randomUUID().toString(); log.info(String.format("sendOrder id: %s", id)); CorrelationData correlationData = new CorrelationData(id); order.setId(IdWorker.getId()); order.setOrderSn("order001"); rabbitTemplate.convertAndSend("amq.topic", "order.1", order, correlationData); order.setId(IdWorker.getId()); order.setOrderSn("order002"); rabbitTemplate.convertAndSend("amq.topic", "order.2", order, correlationData); } //回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一标识:{}", correlationData); log.info("确认结果:{}", ack); if(!ack){ log.info("异常处理...."); log.info("失败原因:{}", cause); } } }; //回调函数: return返回 //启动消息失败返回,比如路由不到队列时触发回调
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体 message : "+message); log.info("消息主体 message : "+replyCode); log.info("描述:"+replyText); log.info("消息使用的交换器 exchange : "+exchange); log.info("消息使用的路由键 routing : "+routingKey); } }; }
4、direct模式消费端代码
package com.xukang.foobar.mq; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import com.xukang.order.entity.Order; /** * direct模式消费端 * * @author PC * */ @Component public class DirectReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"), exchange = @Exchange(value = "amq.direct", durable = "true", type = "direct", ignoreDeclarationExceptions = "true"), key = "order")) @RabbitHandler public void onMessage(Message<Order> message, Channel channel) throws Exception { Order order = message.getPayload(); System.out.println("消费端Payload: " + order); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); // 手工ACK
channel.basicAck(deliveryTag, false); } }
5、topic模式消费端代码
package com.xukang.foobar.mq; import java.io.IOException; import java.util.List; import java.util.Map; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.xukang.order.entity.Order; /** * topic模式消费端 * * @author PC * */ @Component public class TopicReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic-queue", durable = "true"), exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "order.*")) @RabbitHandler public void onMessage(Message<Order> message, Channel channel) throws Exception { Long deliveryTag = 0L; try { deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); Order order = message.getPayload(); System.out.println("消费端Payload: " + order); if(order.getOrderSn().equals("order001")) { //丢弃这条消息
channel.basicNack(deliveryTag, false, true); return; } // 手工ACK
channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息
channel.basicNack(deliveryTag, false, true); } } }