Spring Boot - AMQP 消息中間件


Message Broker是一種消息驗證、傳輸、路由的架構模式,其設計目標主要應用於下面這些場景:

  • 消息路由到一個或多個目的地
  • 消息轉化為其他的表現方式
  • 執行消息的聚集、消息的分解,並將結果發送到他們的目的地,然后重新組合相應返回給消息用戶
  • 調用Web服務來檢索數據
  • 響應事件或錯誤
  • 使用發布-訂閱模式來提供內容或基於主題的消息路由

AMQP 是 Advanced Message Queuing Protocol 的簡稱,它是一個面向消息中間件的開放式標准應用層協議。AMQP定義了這些特性:

  • 消息方向
  • 消息隊列
  • 消息路由(包括:點到點和發布-訂閱模式)
  • 可靠性
  • 安全性

RabbitMQ 是以 AMQP 協議實現的一種中間件產品,也稱為面向消息的中間件,它可以支持多種操作系統,多種編程語言,幾乎可以覆蓋所有主流的企業級技術平台。下面介紹 RabbitMQ 的基本概念:

  • Broker:可以理解為消息隊列服務器的實體,是一個中間件應用,負責接收消息生產者的消息,然后將消息發送至消息接收者或者其他的 Broker
  • Exchange:消息交換機,是消息第一個到達的地方,消息通過他指定的路由規則分發到不同的消息隊列中去,有如下幾種類型:
    • Direct:完全按照 key 進行投遞,比如,綁定時設置了 Routing Key 為 abc,那么客戶端提交消息,只有設置了 Key 為 abc 的才會被投遞到隊列
    • Topic:對於 Key 進行模式匹配后進行投遞,可以使用符號 # 匹配一個或多個詞,符號 * 匹配正好一個詞。比如, abc.# 可以匹配 abc.def.ghi ,而 abc.* 只能匹配 abc.def
    • Fanout:不需要任何 Key,采取廣播的模式,一個消息進來時,投遞到與該交換機綁定的所以隊列
  • Queue:消息隊列,消息通過發送和路由之后最終到達的地方,到達 Queue 的消息即進入邏輯上等待消費的狀態。每個消息都會被發送到一個或多個隊列。
  • Binding:綁定,將 Exchange 和 Queue 按照路由規則綁定起來
  • Routing Key:路由關鍵字,Exchange 根據該關鍵字進行消息投遞
  • Virtual Host:虛擬主機,他是對 Broker 的虛擬划分,將消費者、生產者和依賴的 AMQP 相關結構進行隔離,一般情況都是為了安全考慮。
  • Connection:連接,代表生產者、消費者、Broker 之間進行通信的物理網絡
  • Channel:消息通道,用於連接生產者和消費者的邏輯結構,在客戶端的每個連接里,可以建立多個 Channel,每個 Channel 代表一個會話任務,通過 Channel 可以隔離同一個連接的不同交互內容。
  • Producer:消息生產者,制造消息並發送消息的程序
  • Consumer:消息消費者,接收消息並處理消息的程序

   

快速入門

我們通過在 Spring Boot 應用中整合 RabbitMQ ,實現一個簡單的發送、接收消息的示例:

  • 創建項目

    創建一個 Spring Boot 項目,命名為 spring-boot-rabbitmq,並增加 spring-boot-starter-amqp 依賴,pom.xml 文件內容如下:

    <?xmlversion="1.0"encoding="UTF-8"?>

    <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

       

    <groupId>org.lixue.bus</groupId>

    <artifactId>spring-boot-rabbitmq</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>

       

    <name>spring-boot-rabbitmq</name>

       

    <parent>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-parent</artifactId>

    <version>1.5.12.RELEASE</version>

    <relativePath/><!--lookupparentfromrepository-->

    </parent>

       

    <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

    <java.version>1.8</java.version>

    </properties>

       

    <dependencies>

    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

    </dependency>

       

    <dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-test</artifactId>

    <scope>test</scope>

    </dependency>

    </dependencies>

       

    <build>

    <plugins>

    <plugin>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-maven-plugin</artifactId>

    </plugin>

    </plugins>

    </build>

    </project>

       

  • 創建配置

    在 src/main/resources 目錄中創建 application.yml 配置文件,並增加 RabbitMQ 相關配置,內容如下:

    #配置應用名稱

    spring:

    application:

    name:rabbitmq-bus

    #配置RabbitMQ信息

    rabbitmq:

    #配置連接信息

    addresses:192.168.2.215:5672

    username:lixue

    password:liyong

    virtual-host:/

    #開啟發送確認模式

    publisher-confirms:true

       

  • 消息生產者

    創建消息生產者 Sender 類,通過注入 AmqpTemplate 接口的實例(org.springframework.amqp.rabbit.core.RabbitTemplate)來實現消息的發送,AmqpTemplate 接口定義了一套針對 AMQP 協議的基礎操作,Spring Boot 會根據配置來注入其具體實現。

    package org.lixue.bus;

       

    import org.springframework.amqp.core.AmqpTemplate;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Component;

       

    import java.util.Date;

       

    @Component

    public class Sender{

    @Autowired

    private AmqpTemplate amqpTemplate;

       

    public void send(){

    String context="Hello"+new Date();

    //將消息發送到路由Key=send的隊列

    amqpTemplate.convertAndSend("send",context);

    }

    }

       

  • 消息消費者

    創建消息消費者 Receiver 類,通過注入 AmqpTemplate 接口的實例(org.springframework.amqp.rabbit.core.RabbitTemplate)來實現消息的接收處理,也可以使用 @RabbitListener 和 @RabbitHandler 注解 來指定消息接收的隊列和消息處理方法,使用注解處理消息如下:

    package org.lixue.bus;

       

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;

    import org.springframework.amqp.rabbit.annotation.RabbitListener;

    import org.springframework.stereotype.Component;

       

    @Component

    @RabbitListener(queues="send")

    public class Receiver{

       

    @RabbitHandler

    public void process(String message){

    System.out.println("receive:"+message);

    }

    }

       

    如果不使用注解,可以通過 AmqpTemplate 接口的實例來獲取隊列的消息,代碼如下:

    package org.lixue.bus;

       

    import org.springframework.amqp.core.AmqpTemplate;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Component;

       

    @Component

    public class Receiver{

    @Autowired

    private AmqpTemplate amqpTemplate;

       

    public void receive(){

    String val=(String)amqpTemplate.receiveAndConvert("send");

    if(val!=null){

    System.out.println("receive:"+message);

    }

    }

    }

       

  • 創建配置類

    創建 RabbitMQ 的配置類 RabbitMQConfig,用來配置隊列、交換器、路由等高級信息,代碼如下:

    package org.lixue.bus;

       

    import org.springframework.amqp.core.*;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

       

    @Configuration

    public class RabbitMQConfig{

       

    /**

    *配置隊列相關

    */

    @Bean

    public Queue newQueue(){

    return new Queue("send");

    }

       

    @Bean

    public Queue newQueue2(){

    return new Queue("send2");

    }

       

    /**

    *配置交換器相關

    */

    @Bean

    public Exchange newDirectExchange(){

    return new DirectExchange("directExchange",true,true);

    }

       

    @Bean

    public Exchange newTopicExchange(){

    return new TopicExchange("topicExchange",true,true);

    }

       

    @Bean

    public Exchange newFanoutExchange(){

    return new FanoutExchange("fanoutExchange",true,true);

    }

       

    /**

    *配置隊列和交換器綁定

    */

    @Bean

    public Binding newDirectBinding(){

    return BindingBuilder.bind(newQueue()).to(newDirectExchange()).with("send").noargs();

    }

       

    @Bean

    public Binding newDirectBinding1(){

    return BindingBuilder.bind(newQueue2()).to(newDirectExchange()).with("send2").noargs();

    }

    }

       

  • 測試驗證

    創建消息提供者的單元測試,在單元測試中執行消息發送方法,並執行單元測試,如果使用注解的方式接收消息,則不需要做額外處理,Spring Boot 啟動后會接收消息,如下:

    package org.lixue.bus;

       

    import org.junit.Test;

    import org.junit.runner.RunWith;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.boot.test.context.SpringBootTest;

    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

    import org.springframework.test.context.junit4.SpringRunner;

       

    import static org.junit.Assert.*;

       

    @RunWith(SpringRunner.class)

    @SpringBootTest

    publi cclass SenderTest{

       

    @Autowired

    private Sender sender;

       

    @Test

    public void send() throws Exception{

    for(inti=0;i<1000000;i++){

    sender.send(i);

    }

    }

    }

    日志輸出如下:

    2018-05-04 16:23:39.085 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.2.215:5672]

    2018-05-04 16:23:39.294 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#232532cf:0/SimpleConnection@7c7f9a81 [delegate=amqp://lixue@192.168.2.215:5672/, localPort= 63144]

    2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (directExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.

    2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (topicExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.

    2018-05-04 16:23:39.301 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (fanoutExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.

    2018-05-04 16:23:39.476 INFO 20176 --- [ main] org.lixue.bus.SenderTest : Started SenderTest in 4.536 seconds (JVM running for 6.026)

    receive:Hello Fri May 04 16:23:39 CST 2018

    receive:Hello Fri May 04 16:23:39 CST 2018

    receive:Hello Fri May 04 16:23:39 CST 2018

    receive:Hello Fri May 04 16:23:39 CST 2018

       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM