SpringBoot整合RabbitMQ-整合演示


本系列是學習SpringBoot整合RabbitMQ的練手,包含服務安裝,RabbitMQ整合SpringBoot2.x,消息可靠性投遞實現等三篇博客。

  學習路徑:https://www.imooc.com/learn/1042 RabbitMQ消息中間件極速入門與實戰 

  項目源碼:https://github.com/ZbLeaning/Boot-RabbitMQ 


 整合實際上主要兩步:

  1、引入相關依賴

  2、對application.yml進行配置

注意:后續需要使用數據庫,因此需要安裝mysql。https://blog.csdn.net/qq_37719778/article/details/81298292  mysql安裝教程

Spring.RabbitMQ配置的含義可參考:https://blog.csdn.net/en_joker/article/details/80103519

數據庫建表語句:

DROP TABLE IF EXISTS `broker_message_log`;
CREATE TABLE `broker_message_log` (
  `message_id` varchar(255) NOT NULL COMMENT '消息唯一ID',
  `message` varchar(4000) NOT NULL COMMENT '消息內容',
  `try_count` int(4) DEFAULT '0' COMMENT '重試次數',
  `status` varchar(10) DEFAULT '' COMMENT '消息投遞狀態 0投遞中,1投遞成功,2投遞失敗',
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP COMMENT '下一次重試時間',
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `message_id` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2018091102 DEFAULT CHARSET=utf8;

 完成前期准備后開始進行整合。


 Producer:服務端

  1、新建一個SpringBoot項目,項目結構如下

 

  2、添加Pom.xml文件依賴

 <!--rabbitmq依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper-spring-boot-starter</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.29</version>
        </dependency>
        <dependency>
            <groupId>com.github.miemiedev</groupId>
            <artifactId>mybatis-paginator</artifactId>
            <version>1.2.17</version>
            <exclusions>
                <exclusion>
                    <groupId>org.mybatis</groupId>
                    <artifactId>mybatis</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--工具類依賴包-->
        <dependency>
            <groupId>org.apache-commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

  3、配置application.yml文件

spring:
    rabbitmq:
        addresses: 134.175.33.221:5672
        username: guest
        password: guest
        virtual-host: /
        ##開啟Publisher Confirm機制
        publisher-confirms: true
        ##開啟Publisher Return機制
        publisher-returns: true
        template:
            mandatory: true
    datasource:
        url: jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
        username: root
        password: binzhang
        driverClassName: com.mysql.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource
server:
    port: 8001
    servlet:
        context-path: /
mybatis:
    mapper-locations: classpath:mapping/*.xml
    logging:
        level:
            tk:
                mybatis: trace

  4、編寫消息發送類,直接使用SpringBoot配置的RabbitTemplate模板

import com.imooc.mq.entity.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Title: OrderSender
 * @Description: 訂單發送
 * @date 2019/1/2210:20
 */
@Component
public class OrderSender {
    //使用rabbitmq模板
    @Autowired
    private RabbitTemplate rabbitTemplate;
   //發送消息
    public void sendOrder(Order order) throws Exception{
     
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessageId());

        rabbitTemplate.convertAndSend("order-exchange",//exchange
                "order.abcd",//routingKey
                order,//消息體內容
                correlationData); //消息唯一id
    }
}

  5、通過控制面板手動建立交換機exchange、消息隊列queue

  6、點擊進入創建好的order-exchange,設置綁定路由鍵

  7、寫測試demo,運行消息發現,看控制台是否收到消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class MqApplicationTests {
    @Autowired
    private OrderSender orderSender;

    @Test
    public void contextLoads() {
        Order order = new Order();
        order.setId("aaa");
        order.setName("測試消息a");
        order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
        try {
            orderSender.sendOrder(order);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

   接收消息成功:

   

  注意:啟動時如果報異常

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

  則需要配置

@MapperScan("com.imooc.mq.mapper") @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) 

 Consumer:消費端 

  1、項目結構

 

  2、配置Pom.xml文件,引入依賴,可直接復制服務端pom.xml注入的依賴

  3、配置Application.xml文件

## springboot整合rabbitmq的基本配置
spring:
  rabbitmq:
    addresses: 134.175.33.221:5672
    username: guest
    password: guest
    ##連接到RabbitMQ的虛擬主機
    virtual-host: /
    ## 消費端配置
    listener:
      simple:
        ##消費者的最小數量
        concurrency: 5
        ## manual:手動 ack(確認)
        acknowledge-mode: manual
        ##消費者的最大數量
        max-concurrency: 10
        ##在單個請求中處理的消息個數,應該大於等於事務數量
        prefetch: 1
  datasource:
    url: jdbc:mysql://localhost:3306/rabbitmq?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
    username: root
    password: binzhang
    driverClassName: com.mysql.jdbc.Driver
server:
  port: 8002
  servlet:
    context-path: /

  4、需要將服務端的Order類也復制到消費端

  5、編寫消息接收類

import com.imooc.mq.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * @Title: OrderReceiver
 * @Description: 消費
 * @date 2019/1/2211:03
 */
@Component
public class OrderReceiver {
    /**
     * @RabbitListener 消息監聽,可配置交換機、隊列、路由key
     * 該注解會創建隊列和交互機 並建立綁定關系
     * @RabbitHandler 標識此方法如果有消息過來,消費者要調用這個方法
     * @Payload 消息體
     * @Headers 消息頭
     * @param order
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue",declare = "true"),
            exchange = @Exchange(name = "order-exchange",declare = "true",type = "topic"),
            key = "order.abcd"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers,
                               Channel channel) throws Exception{
        //消費者操作
        System.out.println("------收到消息,開始消費------");
        System.out.println("訂單ID:"+order.getId());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //現在是手動確認消息 ACK
        channel.basicAck(deliveryTag,false);
    }
}

   6、運行成功后

 

基本的服務和消費端整合及演示demo已完成,一般開發過程中我們大都采用手動確認消息機制,如果注釋掉該行則會出現消息被消費但是一直處於未被確認的狀態。當重啟服務端再次發現消息時,消息也會被消費。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM