Springboot+RabbitMQ實戰demo


 

rabbitmq依賴包

<!-- rabbitmq依賴 begin -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- rabbitmq依賴 end -->

 

 

1、application.yml配置文件

spring:
  # rabbitmq配置
  rabbitmq:
    host: amqp.cn-shenzhen-429403-a.aliyuncs.com
    port: 5672
    username: lynch
    password: lynch
    virtual-host: order-vhost
    
    ##RabbitMQ生產端核心配置
    connection-timeout: 5000
    publisher-confirms: true
    publisher-returns: true
    
    ##RabbitMQ消費端核心配置
    listener:
       simple:
          #設置消費端手動 ack
          acknowledge-mode: manual
          #消費者最小數量
          concurrency: 5
          #消費之最大數量
          max-concurrency: 10
          
    template:
      #ReturnCallback的時候代表消息不可達,設置 broker不自動刪除該消息,而是返回到生產端,讓我們進行一些后續的處理
      mandatory: true
      # 啟用重試機制,重試間隔時間為2s,最多重試3次
      retry:
        enabled: true
        initial-interval: 2s
        max-attempts: 3

 

 

2、RabbitMQConfig.java——RabbitMQ配置類

package io.geekidea.springbootplus.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan({"com.xukang.*", "io.geekidea.*"}) public class RabbitMQConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 設置序列化策略
 rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }

 

3、生產端單元測試

package com.xukang.common; import java.util.Date; import java.util.UUID; import org.junit.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.xukang.order.entity.Order; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import io.geekidea.springbootplus.BaseTest; import io.geekidea.springbootplus.config.constant.DatePattern; import lombok.extern.slf4j.Slf4j; @Slf4j public class RabbitmqTest extends BaseTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test() { System.out.println(rabbitTemplate); } /** * 發送direct模式消息 * * @throws Exception */ @Test public void testDirectQueue() throws Exception { Order order = new Order(); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時間戳 全局唯一
        String id = UUID.randomUUID().toString(); log.info(String.format("sendOrder id: %s", id)); CorrelationData correlationData = new CorrelationData(id); order.setId(IdWorker.getId()); order.setOrderSn("order-1"); Date date = new Date(); System.out.println(DateUtil.format(date, DatePattern.YYYY_MM_DD_HH_MM_SS)); order.setCreateTime(date); //rabbitTemplate.convertAndSend("amq.direct", "order", order, correlationData); // 執行發送消息到指定隊列
        rabbitTemplate.convertAndSend("amq.direct", "order", order, message -> { // 設置延遲,單位:毫秒值
            message.getMessageProperties().setHeader("delay", 20000); return message; }, correlationData); } /** * 發送topic模式消息 * * @throws Exception */ @Test public void testTopicQueue() throws Exception { Order order = new Order(); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時間戳 全局唯一
        String id = UUID.randomUUID().toString(); log.info(String.format("sendOrder id: %s", id)); CorrelationData correlationData = new CorrelationData(id); order.setId(IdWorker.getId()); order.setOrderSn("order001"); rabbitTemplate.convertAndSend("amq.topic", "order.1", order, correlationData); order.setId(IdWorker.getId()); order.setOrderSn("order002"); rabbitTemplate.convertAndSend("amq.topic", "order.2", order, correlationData); } //回調函數: confirm確認
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息唯一標識:{}", correlationData); log.info("確認結果:{}", ack); if(!ack){ log.info("異常處理...."); log.info("失敗原因:{}", cause); } } }; //回調函數: return返回 //啟動消息失敗返回,比如路由不到隊列時觸發回調
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體 message : "+message); log.info("消息主體 message : "+replyCode); log.info("描述:"+replyText); log.info("消息使用的交換器 exchange : "+exchange); log.info("消息使用的路由鍵 routing : "+routingKey); } }; }

 

 

 

4、direct模式消費端代碼

package com.xukang.foobar.mq; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import com.xukang.order.entity.Order; /** * direct模式消費端 * * @author PC * */ @Component public class DirectReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"), exchange = @Exchange(value = "amq.direct", durable = "true", type = "direct", ignoreDeclarationExceptions = "true"), key = "order")) @RabbitHandler public void onMessage(Message<Order> message, Channel channel) throws Exception { Order order = message.getPayload(); System.out.println("消費端Payload: " + order); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); // 手工ACK
        channel.basicAck(deliveryTag, false); } }

 

5、topic模式消費端代碼

package com.xukang.foobar.mq; import java.io.IOException; import java.util.List; import java.util.Map; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.xukang.order.entity.Order; /** * topic模式消費端 * * @author PC * */ @Component public class TopicReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic-queue", durable = "true"), exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"), key = "order.*")) @RabbitHandler public void onMessage(Message<Order> message, Channel channel) throws Exception { Long deliveryTag = 0L; try { deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); Order order = message.getPayload(); System.out.println("消費端Payload: " + order); if(order.getOrderSn().equals("order001")) { //丟棄這條消息
                channel.basicNack(deliveryTag, false, true); return; } // 手工ACK
            channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息
            channel.basicNack(deliveryTag, false, true); } } }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM