1.引入maven依賴 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.在application.yml的配置:
spring:
rabbitmq:
host: 106.52.82.241
port: 5672
username: yang
password: Yangxiaohui227
virtual-host: /
publisher-confirms: true #消息發送后,如果發送成功到隊列,則會回調成功信息
publisher-returns: true #消息發送后,如果發送失敗,則會返回失敗信息信息
listener: #加了2下面2個屬性,消費消息的時候,就必須發送ack確認,不然消息永遠還在隊列中
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
//為了統一管理所有的Mq消息,建一個類存儲常量,消息的設計都基本會涉及(隊列(queue),交換機(exchange),路由鍵(route)三個值) public class RabbitMqConstant { //下單發送消息 隊列名,交換機名,路由鍵的配置 public final static String SHOP_ORDER_CREATE_EXCHANGE="shop.order.create.exchange"; public final static String SHOP_ORDER_CREATE_ROUTE="shop.order.create.route"; public final static String SHOP_ORDER_CREATE_QUEUE="shop.order.create.queue"; }
package com.example.demo.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //該類是mq最重要的一個類,所有隊列的創建,交換機的創建,隊列和交換機的綁定都在這里實現 @Configuration public class RabbitMqConfig { private final static Logger log = LoggerFactory.getLogger(RabbitMqConfig.class); @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 單一消費者 * * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); return factory; } /** * 多個消費者 * * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory, connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(20); factory.setMaxConcurrentConsumers(20); factory.setPrefetchCount(20); return factory; } /** * 模板的初始化配置 * * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean sucess, String cause) { if (sucess) { log.info("消息發送成功:correlationData({}),ack({}),cause({})", correlationData, sucess, cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; } //消息的創建設計三個步驟:隊列的創建,交換機創建(direct類型,topic類型,fanout類型),隊列和交換機的通過路由鍵的綁定 //--------- 下單消息配置 //隊列 @Bean public Queue shopOrderCreateQueue() { return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true); } //Direct交換機(一對一關系,一個direct交換機只能綁定一個隊列,當有2個相同消費者時,如項目部署2台機,只有一個消費者能消費,) @Bean DirectExchange shopOrderCreateExchange() { return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE); } //綁定 @Bean Binding bindShopOrderCreateQueue() { return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE); } }
import com.alibaba.fastjson.JSON; import com.example.demo.domain.ShopOrderMast; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; //專門用一個類作為消息的生產者 @Service public class ShopMessagePublisher { @Autowired private RabbitTemplate rabbitTemplate; public void sendCreateOrderMessage(ShopOrderMast orderMast){ CorrelationData correlationData=new CorrelationData(); //該參數可以傳,可以不傳,不傳時,correlationData的id值默認是null,消息發送成功后,在RabbitMqConfig類的rabbitTemplate類的confirm方法會接收到該值 correlationData.setId(orderMast.getCodOrderId()); String msg = JSON.toJSONString(orderMast); //convertAndSend該方法有非常多的重構方法,找到適合自己的業務方法就行了,這里我用的是其中一個,發送時指定exchange和route值,這樣就會發到對應的隊列去了 rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE,msg,correlationData); } }
//所有的消費都寫在一個消費類中 @Service public class ShopMessageComsumer { //監聽下單消息 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE) public void createOrderMesaageComsumer(String msg, Channel channel, Message message) { try { //消息可以通過msg獲取也可以通過message的body屬性獲取 System.out.println("開始消費了"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); /** * 因為我在application.yml那里配置了消息手工確認也就是傳說中的ack,所以消息消費后必須發送確認給mq * 很多人不理解ack(消息消費確認),以為這個確認是告訴消息發送者的,這個是錯的,這個ack是告訴mq服務器, * 消息已經被我消費了,你可以刪除它了 * 如果沒有發送basicAck的后果是:每次重啟服務,你都會接收到該消息 * 如果你不想用確認機制,就去掉application.yml的acknowledge-mode: manual配置,該配置默認 * 是自動確認auto,去掉后,下面的channel.basicAck就不用寫了 * */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { try { //出現異常,告訴mq拋棄該消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } } } }
//這里我發送了一條消息,orderId我設置為555556666666,在消息發送時,存到了CorrelationData對象中,因此,發送成功后,在confirm方法可以接收到該值了
//消息發送成功后,在控制台會看到有成功的回調信息,也就是回調了rabbitTemplate的:
confirm(CorrelationData correlationData, boolean sucess, String cause)

//上面測試的下單消息是direct類型消息的,現在創建一個topic消息
//RabbitMqConstant新增topic的配置信息 //下單topic消息:路由鍵的名字 星號* 代表多個字符,#號代表一個字符 //topic交換機,發送消息時,發送到指定shop.order.create.topic.exchange和shop.order.create.topic.route中 public final static String SHOP_ORDER_CREATE_TOPIC_EXCHANGE="shop.order.create.topic.exchange"; public final static String SHOP_ORDER_CREATE_TOPIC_TOUTE="shop.order.create.topic.route"; //隊列1,通過shop.order.create.topic.*與交換機綁定 public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE="shop.order.create.topic.*"; public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE="shop.order.create.topic.queue.one"; //隊列2 通過shop.order.create.topic.*與交換機綁定shop.order.create.topic.# public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO="shop.order.create.topic.#"; public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO="shop.order.create.topic.queue.two";
//在RabbitMqConfig新增topic隊列的基本信息 //-------------------------下單TOPIC消息的創建 //創建TOPIC交換機 @Bean TopicExchange shopOrderCreateTopicExchange() { return new TopicExchange(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE); } //---------------------------//隊列1使用自己的route和交換機綁定 //創建隊列1 @Bean public Queue shopOrderCreateQueueOne() { return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE, true); } //綁定 @Bean Binding bindShopOrderCreateQueueOne() { return BindingBuilder.bind(shopOrderCreateQueueOne()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE); } //---------------------------//隊列2用自己的route和交換機綁定 //創建隊列2 @Bean public Queue shopOrderCreateQueueTWO() { return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO, true); } //綁定 @Bean Binding bindShopOrderCreateQueueTWO() { return BindingBuilder.bind(shopOrderCreateQueueTWO()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO); }
//消息的發送方新增 //發送TOPIC消息 public void sendCreateOrderTOPICMessage(ShopOrderMast orderMast){ CorrelationData correlationData=new CorrelationData(); //該參數可以傳,可以不傳,不傳時,correlationData的id值默認是null,消息發送成功后,在RabbitMqConfig類的rabbitTemplate類的confirm方法會接收到該值 correlationData.setId(orderMast.getCodOrderId()); String msg = JSON.toJSONString(orderMast); //消息發送使用公共route而不是某個隊列自己的route rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_TOUTE,msg,correlationData); }
//消息的消費方新增 //消費者1 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE) public void createOrderMesaageComsumerOne(String msg, Channel channel, Message message) { try { //消息可以通過msg獲取也可以通過message對象的body值獲取 System.out.println("我是消費者1"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); /** * 因為我在application.yml那里配置了消息手工確認也就是傳說中的ack,所以消息消費后必須發送確認給mq * 很多人不理解ack(消息消費確認),以為這個確認是告訴消息發送者的,這個是錯的,這個ack是告訴mq服務器, * 消息已經被我消費了,你可以刪除它了 * 如果沒有發送basicAck的后果是:每次重啟服務,你都會接收到該消息 * 如果你不想用確認機制,就去掉application.yml的acknowledge-mode: manual配置,該配置默認 * 是自動確認auto,去掉后,下面的channel.basicAck就不用寫了 * */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { try { //出現異常,告訴mq拋棄該消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } } } //消費者2 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO) public void createOrderMesaageComsumerTWO(String msg, Channel channel, Message message) { try { //消息可以通過msg獲取也可以通過message對象的body值獲取 System.out.println("我是消費者2"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); /** * 因為我在application.yml那里配置了消息手工確認也就是傳說中的ack,所以消息消費后必須發送確認給mq * 很多人不理解ack(消息消費確認),以為這個確認是告訴消息發送者的,這個是錯的,這個ack是告訴mq服務器, * 消息已經被我消費了,你可以刪除它了 * 如果沒有發送basicAck的后果是:每次重啟服務,你都會接收到該消息 * 如果你不想用確認機制,就去掉application.yml的acknowledge-mode: manual配置,該配置默認 * 是自動確認auto,去掉后,下面的channel.basicAck就不用寫了 * */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { try { //出現異常,告訴mq拋棄該消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } } }
//測試結果:
//延時隊列:將消息發送到一個隊列,等過了一段時間后,該隊列會將消息轉發到真正的隊列消費,業務場景可以用於訂單定時取消
//在RabbitMqConstant類添加如下內容 //延時隊列,消息先發到延時隊列中,到時間后,再發送到真正的隊列 public final static String SHOP_ORDER_CREATE_DELAY_EXCHANGE="shop.order.create.delay.exchange"; public final static String SHOP_ORDER_CREATE_DELAY_ROUTE="shop.order.create.delay.route"; public final static String SHOP_ORDER_CREATE_DELAY_QUEUE="shop.order.create.delay.queue"; //真正的隊列 public final static String SHOP_ORDER_CREATE_REAL_EXCHANGE="shop.order.create.real.exchange"; public final static String SHOP_ORDER_CREATE_REAL_ROUTE="shop.order.create.real.route"; public final static String SHOP_ORDER_CREATE_REAL_QUEUE="shop.order.create.real.queue";
//在RabbitMqConfig加上 //----------------------- 延時隊列的配置 //延時隊列 @Bean public Queue shopOrderCreateDelayQueue() { Map<String, Object> argsMap= Maps.newHashMap(); argsMap.put("x-dead-letter-exchange",RabbitMqConstant.SHOP_ORDER_CREATE_REAL_EXCHANGE); //真正的交換機 argsMap.put("x-dead-letter-routing-key",RabbitMqConstant.SHOP_ORDER_CREATE_REAL_ROUTE); //真正的路由鍵 return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_QUEUE,true,false,false,argsMap); } //延時交換機 @Bean DirectExchange shopOrderCreateDelayExchange() { return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_EXCHANGE); } //延時隊列綁定延時交換機 @Bean Binding bindShopOrderCreateDelayQueue() { return BindingBuilder.bind(shopOrderCreateDelayQueue()).to(shopOrderCreateDelayExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_ROUTE); } //真正的隊列配置------------------------------------- //真正的隊列 @Bean public Queue shopOrderCreateRealQueue() { return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_QUEUE,true); } //真正的交換機 @Bean DirectExchange shopOrderCreateRealExchange() { return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_EXCHANGE); } //綁定真正的交換機 @Bean Binding bindShopOrderCreateRealQueue() { return BindingBuilder.bind(shopOrderCreateRealQueue()).to(shopOrderCreateRealExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_ROUTE); }
//在消息發送類(ShopMessagePublisher)新增 //發送延時消息 public void sendCreateOrderDelayMessage(ShopOrderMast orderMast){ CorrelationData correlationData=new CorrelationData(); //該參數可以傳,可以不傳,不傳時,correlationData的id值默認是null,消息發送成功后,在RabbitMqConfig類的rabbitTemplate類的confirm方法會接收到該值 correlationData.setId(orderMast.getCodOrderId()); String msg = JSON.toJSONString(orderMast); // convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData) rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_EXCHANGE, RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_ROUTE, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setExpiration("60000");//單位是毫秒 return message; } }, correlationData); }
//在消費類(ShopMessageComsumer) 新增 //延遲隊列中真正隊列監聽 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_REAL_QUEUE) public void createOrderRealMesaageComsumer(String msg, Channel channel, Message message) { try { System.out.println("這是真正的隊列,在監聽延時隊列發送的消息"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { try { //出現異常,告訴mq拋棄該消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } } }
//注意,如果同時使用了延時隊列的queue去接收,那么消息會被延遲隊列的消費者消費,而不是被真正的queue消費
//如果在延遲隊列消費時,加了下面這個隊列,上面那個真正的消費者就接收不到消息了 @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_QUEUE) public void createOrderDelayMesaageComsumer(String msg, Channel channel, Message message) { try { System.out.println("測試延遲隊列自己能否接收"); ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { try { //出現異常,告訴mq拋棄該消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } } }
//補充:對於direct和topic交換機,如果部署多台相同queue的消費者,消息也只會消費一次,通過輪詢的方式進行負債均衡
//如何在rabbitMq管理頁面查看沒有還沒被消費的消息信息:
通過界面發送Mq消息,場景,如日志發現某條消息沒有發送,可以在這里發送回去: