RabbitMQ學習總結


一、RabbitMQ介紹

      MQ全稱為Message Queue,即消息隊列, RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message
Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開
發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/
開發中消息隊列通常有如下應用場景:
1、任務異步處理。
            將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。
2、應用程序解耦合
          MQ相當於一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
市場上還有哪些消息隊列?
          ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
為什么使用RabbitMQ呢?
    1、使得簡單,功能強大。
    2、基於AMQP協議。
    3、社區活躍,文檔完善。
    4、高並發性能好,這主要得益於Erlang語言。
    5、Spring Boot默認已集成RabbitMQ

二、使用MQ的好處

2.1實現異步處理

同步的通信:發出一個調用請求之后,在沒有得到結果之前,就不返回。由調用者主動等待這個調用的結果。

異步通信:調用在發出之后,這個調用就直接返回了,所以沒有返回結果。也就是說,當一個異步過程調用發出后,調用者不會馬上得到結果。而是在調用發出后,

被調用者通過狀態、通知來通知調用者,或通過回調函數處理這個調用。 

2.2實現解耦

耦合是系統內部或者系統之間存在相互作用,相互影響和相互依賴。在我們的分布式系統中,一個業務流程涉及多個系統的時候,他們之間就會形成一個依賴關系。
 
在傳統的通信方式中,訂單系統發生了退貨的動作,那么要依次調用所有下游系統的 API,比如調用庫存系統的 API 恢復庫存,因為這張火車票還要釋放出去給其他乘客購買;調用支付系統的 API,不論是支付寶微信還是銀行 卡,要把手續費扣掉以后,原路退回給消費者;調用通知系統 API 通知用戶退貨成功。
// 偽代碼 public void returnGoods(){ 
stockService.updateInventory ();
payService.refund(); noticeService.notice();

 

這個過程是串行執行的,如果在恢復庫存的時候發生了異常,那么后面的代碼都不會執行。由於這一系列的動作,恢復庫存,資金退還,發送通知,本質上沒有一個嚴格的先后順序,也沒有直接的依賴關系,也就是說, 只要用戶提交了退貨的請求,后面的這些動作都是要完成的。庫存有沒有恢復成功,不影響資金的退還和發送通知。
使用多線程
多線程或者線程池是可以實現的,但是每一個需要並行執行的地方都引入線程,又會帶來線程或者線程池的管理問題。所以使用MQ
訂單系統只需要把退貨的消息發送到消息隊列上,由各個下游的業務系統自己創建隊列,然后監聽隊列消費消息。
在這種情況下訂單系統里面就不需要配置其他系統的 IP、端口、接口地址了,因為它不需要關心消費者在網絡上的什么位置,所以下游系統改 IP 沒有任何影響。
甚至不需要關心消費者有沒有消費成功,它只需要把消費發到消息隊列的服務器上就可以了。 這樣,我們就實現了系統之間依賴關系的解耦。

2.3實現流量削鋒

在很多的電商系統里面,有一個瞬間流量達到峰值的情況,比如京東的 618,淘寶的雙 11,還有小米搶購。普通的硬件服務器肯定支撐不了這種百萬或者千萬級別的並發量,就像 2012 年的小米一樣,動不動服務器就崩潰。 如果通過堆硬件的方式去解決,那么在流量峰值過去以后就會出現巨大的資源浪費。那要怎么辦呢?如果說要保護我們的應用服務器和數據庫,
限流也是可以的,但是這樣 又會導致訂單的丟失,沒有達到我們的目的。
引入MQ, MQ是隊列,一定有隊列的特性,(先進先出) 就可以先把所有的流量承接下來,轉換成 MQ 消息發送到消息隊列服務器上,業務層就可以根據自己的消費速率去處理這些消息,
處理之后再返回結果。 就像我們在火車站排隊一樣,大家只能一個一個買票,不會因為人多就導致售票員忙不過來。如果要處理快一點,大不了多開幾個窗口(增加幾個消費者)。
總結起來:
1) 對於數據量大或者處理耗時長的操作,我們可以引入 MQ 實現異步通信,減少客戶端的等待,提升響應速度。
2) 對於改動影響大的系統之間,可以引入 MQ 實現解耦,減少系統之間的直接依賴。
3) 對於會出現瞬間的流量峰值的系統,我們可以引入 MQ 實現流量削峰,達到保護應用和數據庫的目的。

三、 RabbitMQ 中的概念模型

    MQ的本質:消息隊列,又叫做消息中間件。是指用高效可靠的消息傳遞機制進行與平台無關的數據交流,並基於數據通信來進行分布式系統的集成。

              通過提供消息傳遞和消息隊列模型, 可以在分布式環境下擴展進程的通信

            MQ的特點:

              1、 是一個獨立運行的服務。生產者發送消息,消費者接收消費,需要先跟服務器建立連接。                       

                         2、 采用隊列作為數據結構,有先進先出的特點。
                         3、 具有發布訂閱的模型,消費者可以獲取自己需要的消息。
                        

             消息模型:

                               所有 MQ 產品從模型抽象上來說都是一樣的過程:消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,

                               最后將消息發送到監聽的消費者。

  

            RabbitMQ的基本概念

                        下圖是RabbitMQ的基本結構:

        

組成部分說明如下:

  •  Broker :消息隊列服務,此進程包括兩個部分:Exchange和Queue。
  • Exchange :消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。隊列使用綁定鍵(Binding Key)跟交換機建立綁定關系。
  • Queue :消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的消費方,它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
  • Producer :消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。
  • Consumer :消息消費者,即消費方客戶端,接收MQ轉發的消息。
  • Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
  • Binding:綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
  • Connection:無論是生產者發送消息,還是消費者接收消息,都必須跟Broker之間建立一個連接,這個是TCP長連接
  • Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
  • Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。 

Virtual Host理解如下圖:

 相關名詞:

包括:ConnectionFactory(連接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。

ConnectionFactory(連接管理器):應用程序與Rabbit之間建立連接的管理器,程序代碼中使用;

Channel(信道):消息推送使用的通道;

Exchange(交換器):用於接受、分配消息;

Queue(隊列):用於存儲生產者的消息;

RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;

BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;

看到上面的解釋,最難理解的路由鍵和綁定鍵了,那么他們具體怎么發揮作用的,請看下圖:

 

 

消息發布接收流程:
-----發送消息-----
1、生產者和Broker建立TCP連接。
2、生產者和Broker建立通道。
3、生產者通過通道消息發送給Broker,由Exchange將消息進行轉發。
4、Exchange將消息轉發到指定的Queue(隊列)
----接收消息-----
1、消費者和Broker建立TCP連接
2、消費者和Broker建立通道
3、消費者監聽指定的Queue(隊列)
4、當有消息到達Queue時Broker默認將消息推送給消費者。
5、消費者接收到消息。

四、 下載安裝

RabbitMQ由Erlang語言開發,Erlang語言用於並發及分布式系統的開發,在電信領域應用廣泛,OTP(Open
Telecom Platform)作為Erlang語言的一部分,包含了很多基於Erlang開發的中間件及工具庫,安裝RabbitMQ需
要安裝Erlang/OTP,並保持版本匹配,如下圖:
RabbitMQ的下載地址:http://www.rabbitmq.com/download.html

 

1)下載erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe,以管理員方式運行此文件,安裝。

erlang安裝完成需要配置erlang環境變量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添
加%ERLANG_HOME%\bin;

2)安裝RabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3,以管理員方式運行此文件,安裝

 3)啟動

  1.  安裝成功后會自動創建RabbitMQ服務並且啟動。

              從開始菜單啟動RabbitMQ,完成在開始菜單找到RabbitMQ的菜單:

RabbitMQ Service-install :安裝服務
RabbitMQ Service-remove 刪除服務
RabbitMQ Service-start 啟動
RabbitMQ Service-stop 啟動

2.如果沒有開始菜單則進入安裝目錄下sbin目錄手動啟動:

1)安裝並運行服務
rabbitmq-service.bat install 安裝服務 rabbitmq-service.bat stop 停止服務 rabbitmq-service.bat start 啟動服務
2)安裝管理插件
安裝rabbitMQ的管理插件,方便在瀏覽器端管理RabbitMQ
管理員身份運行 rabbitmq-plugins.bat enable rabbitmq_management

啟動成功 登錄RabbitMQ
進入瀏覽器,輸入:http://localhost:15672

 

初始賬號和密碼:guest/guest

3) 注意事項:

1、安裝erlang和rabbitMQ以管理員身份運行。
2、當卸載重新安裝時會出現RabbitMQ服務注冊失敗,此時需要進入注冊表清理erlang
搜索RabbitMQ、ErlSrv,將對應的項全部刪除。

五、java操作隊列

     1、消息隊列RabbitMQ的五種形式隊列

                   1).點對點(簡單)的隊列

                   2).工作(公平性)隊列模式

                   3.發布訂閱模式

                   4.路由模式Routing

                   5.通配符模式Topics

   2、簡單隊列

         1)功能:一個生產者P發送消息到隊列Q,一個消費者C接收

    

          P表示為生產者 、C表示為消費者 紅色表示隊列。

點對點模式分析:

 Maven依賴:

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
    </dependencies>

     封裝Connection:

/**
 * 封裝Connection
 */
public class MQConnectionUtils {

    public static Connection getConnection(){
        //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器地址
        factory.setHost("localhost");
        //設置端口號
        factory.setPort(5672);
        //設置用戶名
        factory.setUsername("guest");
        //設置密碼
        factory.setPassword("guest");
        //設置vhost
        factory.setVirtualHost("/admin_yehui");
        try {
            //創建連接
           return factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
} 

參數詳解:

1)聲明交換機的參數
       String type:交換機的類型,direct, topic, fanout 中的一種。
       boolean durable:是否持久化,代表交換機在服務器重啟后是否還存在。
2)聲明隊列的參數
boolean durable:是否持久化,代表隊列在服務器重啟后是否還存在。
boolean exclusive:是否排他性隊列。排他性隊列只能在聲明它的 Connection中使用(可以在同一個 Connection 的不同的 channel 中使用),連接斷開時自動刪除。
boolean autoDelete:是否自動刪除。如果為 true,至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,隊列會自動刪除。
Map<String, Object> arguments:隊列的其他屬性
3)消息屬性 BasicProperties
以下列舉了一些主要的參數:
  

生產者:

public class Producer {

    private  static final  String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //得到連接
        Connection connection = MQConnectionUtils.getConnection();
        //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
        Channel channel = connection.createChannel();
        //聲明隊列 如果Rabbit中沒有此隊列將自動創建
        /**
         * 參數1:隊列的名稱
         * 參數2:是否持久化
         * 參數3:是否獨占此鏈接,是否排他性隊列。排他性隊列只能在聲明它的 Connection中使用(可以在同一個 Connection 的不同的 channel 中使用),
連接斷開時自動刪除。
         * 參數4:隊列不在使用時是否自動刪除
         * 參數5:隊列參數
         *
         */
        channel.queueDeclare(QUEUE_NAME, false,false, false, null);
        String msg = "test_yehui_rabbitmq";
        /**
         * 發送消息
         * 參數1: Exchange的名稱,如果沒有指定,則使用Default Exchange
         * 參數2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
         * 參數3:消息包含的屬性
         * 參數4:消息體
         * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯
         * 示綁定或解除綁定認的交換機,routingKey等於隊列名稱
         */
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("消息發送體:"+msg);
        channel.close();
        connection.close();
    }
}

 消費者:

public class Consumer01 {
    private  static final  String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException {
        //得到連接
        Connection connection = MQConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
        //定義消費方法
        DefaultConsumer  consumer = new DefaultConsumer (channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息內容 String message = new String(body, "utf-8"); System.out.println("消費者消費:"+message); } }; //監聽隊列 /** * 參數1:隊列名稱 * 參數2: 設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置 * 為false則需要手動回復 * 參數3:消費消息的方法,消費者接收到消息后調用此方法 */ channel.basicConsume(QUEUE_NAME,true,consumer); } }

3、消息隊列RabbitMQ應答模式

為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。 如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。 沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。 消息應答是默認打開的。

我們通過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。

如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同於ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。

4、工作隊列

work queues與簡單隊列相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。
應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

 

  P表示為生產者 、C表示為消費者 紅色表示隊列。

 工作隊列分析

均攤消費

發布訂閱模式:
1、每個消費者監聽自己的隊列。
2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息

測試:

1、使用簡單隊列,啟動多個消費者。
2、生產者發送多個消息。

結果:
1、一條消息只會被一個消費者接收;
2、rabbit采用輪詢的方式將消息是平均發送給消費者的;
3、消費者在處理完某條消息后,才會收到下一條消息。

RabbitMQ的公平轉發

  目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。
並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。 為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount
= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,只有在消費者空閑的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,
也就是等待消費者處理完畢並自己對剛剛處理的消息進行確認之后,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去清閑。 通過 設置channel.basicQos(
1);

  生產者

public class Producer { private  static final  String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到連接
        Connection connection = MQConnectionUtils.getConnection(); //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
        Channel channel = connection.createChannel(); //聲明隊列 如果Rabbit中沒有此隊列將自動創建
        /** * 參數1:隊列的名稱 * 參數2:是否持久化 * 參數3:是否獨占此鏈接 * 參數4:隊列不在使用時是否自動刪除 * 參數5:隊列參數 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); /** * 發送消息 * 參數1: Exchange的名稱,如果沒有指定,則使用Default Exchange * 參數2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列 * 參數3:消息包含的屬性 * 參數4:消息體 * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯 * 示綁定或解除綁定認的交換機,routingKey等於隊列名稱 */ channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息
        for(int i=0;i<10;i++){ String msg = "test_yehui_rabbitmq"+i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } System.out.println("消息發送完畢"); channel.close(); connection.close(); } }

 消費者1:

public class Consumer01 { private  static final  String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到連接
        Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
        Channel channel = connection.createChannel(); //聲明隊列 channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息 //定義消費方法
        DefaultConsumer  consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息內容 String message = new String(body, "utf-8"); System.out.println("消費者消費:"+message); try { //睡眠1s Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動回執消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列 /** * 參數1:隊列名稱 * 參數2: 設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置 * 為false則需要手動回復 * 參數3:消費消息的方法,消費者接收到消息后調用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }

 消費者2

public class Consumer02 { //隊列名稱
    private  static final  String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到連接
        Connection connection = MQConnectionUtils.getConnection(); //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
        Channel channel = connection.createChannel(); //聲明隊列 如果Rabbit中沒有此隊列將自動創建
    channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
throws IOException { //得到交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息內容 String message = new String(body, "utf-8"); System.out.println("消費者消費:"+message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動回執消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列 /** * 參數1:隊列名稱 * 參數2: 設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置 * 為false則需要手動回復 * 參數3:消費消息的方法,消費者接收到消息后調用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }

 結果;

    消費者1比消費者2消費得少

5、RabbitMQ交換機的作用

生產者發送消息不會向傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列在將消息以推送或者拉取方式給消費者進行消費,

這和我們之前學習Nginx有點類似。 交換機的作用根據具體的路由策略分發到不同的隊列中,交換機有四種類型。

Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的,隊列與直連類型的交換機綁定,需指定一個精確的綁定鍵,

生產者發送消息時會攜帶一個路由鍵。只有當路由鍵與其中的某個綁定鍵完全匹配時,這條消息才會從交換機路由到滿足路由關系的此隊列上。
例如:channel.basicPublish(“MY_DIRECT_EXCHANGE”,”spring”,”msg1”); 只有第一個隊列能收到消息。

Fanout exchange(廣播交換機)主題類型的交換機與隊列綁定時,不需要指定綁定鍵。因此生產者發送消息到廣播類型的交換機上,也不需要攜帶路由鍵。消息達到交換機時,所有與之綁定了的隊列,都會收到相同的消息的副本。

 

例如:
channel.basicPublish("MY_FANOUT_EXCHANGE", "", "msg 4"); 三個隊列都會收到 msg 4。 

 

Topic exchange(主題交換機)隊列通過路由鍵綁定到交換機上,然后,交換機根據消息里的路由值,將消息路由給一個或多個綁定隊列,

隊列與主題類型的交換機綁定時,可以在綁定鍵中使用通配符。兩個通配符:
# 0 個或者多個單詞
* 不多不少一個單詞
單詞(word)指的是用英文的點“.”隔開的字符。例如 abc.def 是兩個單詞。
解讀:第一個隊列支持路由鍵以 junior 開頭的消息路由,后面可以有單詞,也可以沒有。
第二個隊列支持路由鍵以 netty 開頭,並且后面是一個單詞的消息路由。
第三個隊列支持路由鍵以 jvm 結尾,並且前面是一個單詞的消息路由。
例如:
channel.basicPublish("MY_TOPIC_EXCHANGE","junior.fjd.klj","msg 2"); 只有第一個隊列能收到消息。
channel.basicPublish("MY_TOPIC_EXCHANGE","junior.jvm", "msg 3"); 第 一個隊列和第三個隊列能收到消息。

 

p是生產者   X是交換機  C1 、C2 是消費者

 6、發布/訂閱模式Publish/Subscribe

   基本概念:

          這個可能是消息隊列中最重要的隊列了,其他的都是在它的基礎上進行了擴展。

         功能實現:一個生產者發送消息,多個消費者獲取消息(同樣的消息),包括一個生產者,一個交換機,多個隊列,多個消費者。  

思路解讀(重點理解):                                             

(1)一個生產者,多個消費者

(2)每一個消費者都有自己的一個隊列

(3)生產者沒有直接發消息到隊列中,而是發送到交換機

(4)每個消費者的隊列都綁定到交換機上

(5)消息通過交換機到達每個消費者的隊列 該模式就是Fanout Exchange(廣播交換機)將消息路由給綁定到它身上的所有隊列 以用戶發郵件案例講解

注意:交換機沒有存儲消息功能,如果消息發送到沒有綁定消費隊列的交換機,消息則丟失。  

 工作原理圖:

 

生產者:

public class ProducerFanout { //交換機
    private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //得到連接
        Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
        Channel channel = connection.createChannel();
//聲明交換機 
/** *
參數1:交換機名稱
* 參數2:交換機類型
*/

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//發送消息 /** * 參數1:交換機名稱 * 參數2:路由key * 參數3:消息屬性參數 * 參數4:消息實體 */
channel.basicPublish(EXCHANGE_NAME,
"",null,"fanout_exchange_msg".getBytes());
channel.close();
connection.close();
}
}

 郵件消費者

public class ConsumerEmailFanout { //郵件隊列
    private static final String EMAIL_QUEUE = "email_queue"; //交換機
    private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("郵件消費者"); //得到連接
        Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
        Channel channel = connection.createChannel();
       //聲明一個隊列
        channel.queueDeclare(EMAIL_QUEUE,false,false,false,null);
        //綁定交換機  // 4.消費者綁定交換機 參數1 隊列 名稱 參數2交換機名稱 參數3 routingKey
        channel.queueBind(EMAIL_QUEUE,EXCHANGE_NAME,"");
 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, 
Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息:" + msg); } }; //消費者監聽隊列消息 channel.basicConsume(EMAIL_QUEUE,true,consumer); } }

 短信消費者

public class ConsumerSMSFanout { //短信隊列
    private static final String SMS_QUEUE = "sms_queue"; //交換機
    private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("短信消費者"); //得到連接
        Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
        Channel channel = connection.createChannel(); //聲明一個隊列
        channel.queueDeclare(SMS_QUEUE,false,false,false,null); //綁定交換機 // 4.消費者綁定交換機 參數1 隊列 名稱 參數2交換機名稱 參數3 routingKey
        channel.queueBind(SMS_QUEUE,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息:" + msg); } }; //消費者監聽隊列消息 channel.basicConsume(SMS_QUEUE,true,consumer); } }

 7、路由模式RoutingKey

 

 

 

路由模式:
1、每個消費者監聽自己的隊列,並且設置routingkey。
2、生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

生產者:

/** * 生產者 */
public class DirctProduct { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明交換機 //3.綁定的交換機 參數1交互機名稱 參數2 exchange類型
        channel.exchangeDeclare("dirctExchange","direct"); //聲明隊列
        channel.queueDeclare("emailDirectQueue",true,false,false,null); channel.queueDeclare("smsDirectQueue",true,false,false,null); //綁定交換機 //交換機和隊列進行綁定
        /** * 參數1:隊列的名稱 * 參數2:交換機的名稱 * 參數3:路由key */ channel.queueBind("emailDirectQueue","dirctExchange","emailRoutKey"); channel.queueBind("smsDirectQueue","dirctExchange","smsRoutKey"); //發送消息
        channel.basicPublish("dirctExchange","emailRoutKey",null,"Email郵件發送".getBytes()); channel.basicPublish("dirctExchange","smsRoutKey",null,"Sms發送發送".getBytes()); channel.close(); connection.close(); } }

 郵件消費者

/** * 郵件消費者 */
public class EamilConsomer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("emailDirectQueue",true,consumer); } }

 短信消費者

/** * 短信消費者 */
public class SmsConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("smsDirectQueue",true,consumer); } }

 8、通配符模式Topics

 

 

路由模式:
1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。
2、生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配,由交換機根據routingkey來轉發消息到指定的隊列。

符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor

 生產者:

 

public class ProducerTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明交換機
        channel.exchangeDeclare("topicExchange","topic"); //聲明隊列
        channel.queueDeclare("emailQueueTopic",true,false,false,null); channel.queueDeclare("smsQueueTopic",true,false,false,null); //綁定
        channel.queueBind("emailQueueTopic","topicExchange","email.*"); channel.queueBind("smsQueueTopic","topicExchange","sms.*"); //發送消息
        channel.basicPublish("topicExchange","email.log",null,"email郵件".getBytes()); channel.basicPublish("topicExchange","sms.log",null,"sms郵件".getBytes()); channel.close(); connection.close(); } }

 

 

 

 郵件消費者:

 

public class EmailTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 
throws IOException { System.out.println("郵件隊列:"+new String(body)); } }; channel.basicConsume("emailQueueTopic",true,consumer); } }

 

 

 

 短信消費者:

 

public class SmsTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 
throws IOException { System.out.println("短信隊列:"+new String(body)); } }; channel.basicConsume("smsQueueTopic",true,consumer); } }

 9、SpringBoot整合RabbitMQ

       生產者:

          maven依賴     

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- fastjson 依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.29</version>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
    </dependencies>

 yml文件

spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui

 定義RabbitConfig類,配置Exchange、Queue、及綁定交換機。

案例是用的是fanout交換機類型

@Configuration public class RabbitMQConfig { // 郵件隊列
    public static String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue"; // 短信隊列
    public static String FANOUT_SMS_QUEUE = "fanout_sms_queue"; //交換機
    public static String EXCHANGE_NAME = "fanoutExchange"; //定義郵件隊列
    @Bean("fanoutEamilQueue") public Queue fanoutEamilQueue(){ return new Queue(FANOUT_EMAIL_QUEUE); } //定義短信隊列
    @Bean("fanoutSmsQueue") public Queue fanoutSmsQueue(){ return new Queue(FANOUT_SMS_QUEUE); } //定義交換機
    @Bean("fanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE_NAME); } //將郵件隊列綁定交換機
    @Bean("bindingEmailExchange") public Binding bindingEmailExchange(@Qualifier("fanoutEamilQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //將郵件隊列綁定交換機
    @Bean("bindingSmsExchange") public Binding bindingSmsExchange(@Qualifier("fanoutSmsQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }

 生產者投遞消息

/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EMAIL_QUEUE,msg); } }

控制層調用代碼

@RestController public class RabbitController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/index") public String index(){ fanoutProducer.send("郵件消息"); fanoutProducer.send("短信消息"); return "index"; } }

 消費者

    maven依賴

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- fastjson 依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
    </dependencies>

 application.yml文件

spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui server: port: 8081

 監聽消息

/** * 監聽隊列 */ @Component public class ReceiveHandler { /** * 郵箱 * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("郵箱消費者獲取生產者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消費者獲取生產者消息msg:" + msg); } }

 10、消息確認機制

問題產生背景: 生產者發送消息出去之后,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的。而且有的時候我們在發送消息之后,

后面的邏輯出問題了,我們不想要發送之前的消息了,需要撤回該怎么做。

解決方案: 1.AMQP 事務機制

                2.Confirm 模式

1.事務模式:

RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback()

txSelect用於將當前channel設置成transaction模式

txCommit用於提交事務

txRollback用於回滾事務

在事務模式里面,只有收到了服務端的 Commit-OK 的指令,才能提交成功。所以
可以解決生產者和服務端確認的問題。但是事務模式有一個缺點,它是阻塞的,一條消
息沒有發送完畢,不能發送下一條消息,它會榨干 RabbitMQ 服務器的性能。所以不建
議大家在生產環境使用。

rabbitMq api實現

springboot 整合RabbitMq 實現

/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { //在springboot設置事務
            rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend("faoutEmailQueue1",i); } return "success"; } } @Component public class ReceiveHandler { /** * 郵箱 * @param
     */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(String i) throws IOException { System.out.println(i); } }

 使用抓包工具查看沒有使用事務時:

  使用抓包工具查看使用事務時:

 2.生產者確認Confirm 模式

RabbitTemplate模板類

   @Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //spring.rabbitmq.publisher-confirms=true 開啟確認模式, // 可以在application.properties,也可以根據下面的代碼配置
        connectionFactory.setPublisherConfirms(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) {// 如果發送交換機成功,但是沒有匹配路由到指定的隊列, 這個時候ack返回是true(這是一個坑)
                    System.out.println("生產者ACK成功:" + correlationData.getId()); } else { // 失敗 啟動重試 // 強制轉換
                    CorrelationDataDo errorCorrelationData = (CorrelationDataDo) correlationData; System.out.println(errorCorrelationData);// 注意: 不能調用rabbitTemplate發送,會導致線程死鎖 //rabbitTemplate.convertAndSend(); // 解決辦法 errorCorrelationData放入緩存. 讓定時任務輪詢發送.
                    Map errorMap = new HashMap(); errorMap.put("status", "-2");// ack失敗
                    errorMap.put("errorMsg", cause); errorMap.put("errorCorrelationData", errorCorrelationData); redisTemplate.boundHashOps("orderMessageStatus").put(errorCorrelationData.getId(), errorMap); } } }); return rabbitTemplate; }

注意事項: 但是在主線程發送消息的過程中,rabbitMQ服務器關閉,這時候主程序和 ConfirmCallback 線程都
會等待Connection恢復,然后重新啟動 rabbitmq ,當應用程序重新建立 connection 之后,兩個線程都會死鎖

擴展CorrelationDataDo

@Data public class CorrelationDataDo extends CorrelationData { // 消息
    private Object message; // 交換機
    private String exchange; // 路由鍵
    private String routingKey; // 重試次數
    private int retryCount = 0; }
/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<String,Object>(); map.put("id",i); String uuid = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody((JSONObject.toJSONString(map)).getBytes())//設置發送消息
                    .setCorrelationId(uuid)//設置全局ID
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化
                    .setContentType("application/json")//設置格式application/json
 .build(); //CorrelationData 用於消息確認 CorrelationDataDo繼承了CorrelationData用於擴展屬性
            CorrelationDataDo correlationData = new CorrelationDataDo(); correlationData.setId(uuid); correlationData.setMessage(message); correlationData.setExchange("faoutEmailQueue1"); rabbitTemplate.convertAndSend("faoutEmailQueue1",message,correlationData); } return "success"; } } /** * 接收消息 */ @Component public class ReceiveHandler { /** * 郵箱 * @param
     */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(Message message) { // 創建json消息轉換器
        Jackson2JsonMessageConverter  jsonMessageConverter = new Jackson2JsonMessageConverter(); //獲取消息
        Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }

 3.消費者消息確認

3.1消費json格式數據

// 創建json消息轉換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 獲取消息 Map map = (Map) jackson2JsonMessageConverter.fromMessage(message); //1.獲取消息
Long seckillId = ((Integer) map.get("seckillId")).longValue(); //2.扣減庫存
int result = 0; result = seckillGoodsMapper.updateStockCountByPrimaryKey(seckillId); if (result > 0) { System.out.println("修改庫存成功"); // 清空redis緩存數據
redisTemplate.boundHashOps("orderMessage").delete(message.getMessageProperties().getC orrelationId()); } else { System.out.println("修改庫存失敗,人工處理"); }

 

 3.2消息確認方式

基於配置:

在消費者application.properties文件中設置消息確認方式

 

# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto spring.rabbitmq.listener.simple.acknowledge-mode=auto

 

 

 

none: autoAck=true 自動ack,不管監聽是否發生錯誤都返回ack

manual: 手動ack, 用戶手動設置ack或者nack

auto: 根據監聽容器反會ack或者nack,如果容器拋出異常則自動啟動重試機制.

auto模式:

# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto spring.rabbitmq.listener.simple.acknowledge-mode=auto

 

manual 模式 

# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto spring.rabbitmq.listener.simple.acknowledge-mode=manual

 

配置在config類里面

    @Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //簽收模式 AcknowledgeMode.MANUAL手動 AcknowledgeMode.AUTO自動,AcknowledgeMode.NONE // none: autoAck=true 自動ack,不管監聽是否發生錯誤都返回ack
 factory.setAcknowledgeMode(AcknowledgeMode.NONE); return factory; }

消費者代碼

 @Component public class SeckillOrder_Consumer { @Autowired TbSeckillGoodsMapper seckillGoodsMapper; @Autowired RedisTemplate redisTemplate; //containerFactory 這個就是不配置在配置文件里面,而是寫在config配置類里面的引入的config類
        @RabbitListener(queues = {"seckillOrder_queue"},containerFactory = "mqConsumerlistenerContainer") public void receive(Message message, Channel channel) throws IOException { // 創建json消息轉換器
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); Map map = (Map) jackson2JsonMessageConverter.fromMessage(message); // 解決冪等性問題
            Object orderMessage = redisTemplate.boundHashOps("orderMessage").get(message.getMessageProperties().getCorrelationId()); if (orderMessage == null) { System.out.println("已經消費了, 不在重復消費"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } //1.獲取消息
            Long seckillId = ((Integer) map.get("seckillId")).longValue(); //2.扣減庫存
            int result = 0; try { result = seckillGoodsMapper.updateStockCountByPrimaryKey(seckillId); if (result > 0) { System.out.println("修改庫存成功"); //清空redis緩存數據
                    redisTemplate.boundHashOps("orderMessage").delete(message.getMessageProperties().getCorrelationId()); // 手動確認ack
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("修改庫存失敗,人工處理"); // 將錯誤放入緩存
                    Map errorMap = new HashMap(); errorMap.put("status", "-4");//SQL執行異常
                    errorMap.put("errorMsg", "SQL執行的結果:" + result); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } catch (Exception e) { System.out.println("修改庫存失敗,人工處理"); // 手動確認nack 區分問題原因: 數據庫網絡連接(重回隊列) 其他原因(不重回隊列) // 將錯誤放入緩存
                Map errorMap = new HashMap(); if (e instanceof MyBatisSystemException) { MyBatisSystemException myBatisSystemException = (MyBatisSystemException) e; // 獲取根本原因
                    Throwable rootCause = myBatisSystemException.getRootCause(); if (rootCause instanceof ConnectException) { // 重試-重回隊列 設置重試3次
                        errorMap = (Map) redisTemplate.boundHashOps("orderMessageStatus").get(message.getMessageProperties().getCorrelationId()); if (errorMap == null) { errorMap = new HashMap(); errorMap.put("retryCount", 0);//初始重試次數
 } int retryCount = (int) errorMap.get("retryCount"); if (++retryCount <= 3) { // 重試
                            errorMap.put("status", "-5");//數據庫網絡原因
                            errorMap.put("errorMsg", e.getMessage()); errorMap.put("retryCount", retryCount); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.out.println("網絡原因.開啟重試:" + retryCount); } else { errorMap.put("status", "-5");//數據庫網絡原因
                            errorMap.put("errorMsg", e.getMessage()); errorMap.put("retryCount", retryCount); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("重試次數已用完. 不在重試..."); } } else { errorMap.put("status", "-6");//其他原因
                        errorMap.put("errorMsg", e.getMessage()); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("其他原因,不需要重回隊列"); } } else { errorMap.put("status", "-6");//其他原因
                    errorMap.put("errorMsg", e.getMessage()); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("其他原因,不需要重回隊列"); } } } }

config類相關的解釋請參考:https://www.jianshu.com/p/090ed51006d5,https://blog.csdn.net/qq_42606051/article/details/82869148

11、限流

基於配置文件

#限流2條 spring.rabbitmq.listener.simple.prefetch=2
/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<String,Object>(); map.put("id",i); String uuid = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody((JSONObject.toJSONString(map)).getBytes())//設置發送消息
                    .setCorrelationId(uuid)//設置全局ID
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化
                    .setContentType("application/json")//設置格式application/json
 .build(); //CorrelationData 用於消息確認 CorrelationDataDo繼承了CorrelationData用於擴展屬性
            CorrelationDataDo correlationData = new CorrelationDataDo(); correlationData.setId(uuid); correlationData.setMessage(message); correlationData.setExchange("faoutEmailQueue1"); rabbitTemplate.convertAndSend("faoutEmailQueue1",message,correlationData); } return "success"; } } /** * 接收消息 */ @Component public class ReceiveHandler { /** * 郵箱 * @param
     */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(Message message) { // 創建json消息轉換器
        Jackson2JsonMessageConverter  jsonMessageConverter = new Jackson2JsonMessageConverter(); //獲取消息
        Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }

 

在消費者端debug運行結果:

運行生產者可以看到Ready=3 Unacked=2, Total=5, Total代表隊列中的消息總條數,Ready代表消費者還可以讀到的條數,Unacked:代表還有多少條沒有被應答

基於config配置類

  @Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //限流配置
        factory.setPrefetchCount(2); return factory; }

消費者端代碼

/** * 接收消息 */ @Component public class ReceiveHandler { /** * 郵箱 * @param
     */ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1"), containerFactory = "mqConsumerlistenerContainer") public void receive_email(Message message) { // 創建json消息轉換器
        Jackson2JsonMessageConverter  jsonMessageConverter = new Jackson2JsonMessageConverter(); //獲取消息
        Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }

 

參考博客代碼:https://blog.csdn.net/linsongbin1/article/details/100658415,https://blog.csdn.net/vbirdbest/article/details/78699913

11、RabbitMQ消息重試機制

消費者在消費消息的時候,如果消費者業務邏輯出現程序異常,這時候應該如何處理?

答案:使用消息重試機制。(演示重試機制)

如何合適選擇重試機制:

情況1: 消費者獲取到消息后,調用第三方接口,但接口暫時無法訪問,是否需要重試? (需要重試機制)

情況2: 消費者獲取到消息后,拋出數據轉換異常,是否需要重試?(不需要重試機制)需要發布進行解決。

如何實現重試機制 總結:

對於情況2,如果消費者代碼拋出異常是需要發布新版本才能解決的問題,那么不需要重試,重試也無濟於事。應該采用日志記錄+定時任務job健康檢查+人工進行補償

重試機制案例:

 生產者代碼就按照上面的案例就可以了,

消費者:

yml文件

spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####開啟消費者重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 3000 server: port: 8081
/** * 監聽隊列 */ @Component public class ReceiveHandler { /** * 郵箱 * @param msg */ //rabbitmq 默認情況下 如果消費者程序出現異常的情況下,會自動實現補償機制
  //重試機制都是間隔性的
// 補償(重試機制) 隊列服務器 發送補償請求 // 如果消費端 程序業務邏輯出現異常消息會消費成功嗎? 是不能消費者成功的 //@RabbitListener 底層 使用Aop進行攔截,如果程序沒有拋出異常,自動提交事務 // 如果Aop使用異常通知攔截 獲取異常信息的話,自動實現補償機制 ,該消息會緩存到rabbitmq服務器端進行存放,一直重試到不拋異常為准。 // 修改重試機制策略 一般默認情況下 間隔5秒重試一次 @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("出現異常"); int i = 1/0; System.out.println("郵箱消費者獲取生產者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消費者獲取生產者消息msg:" +
msg); } }

 調用第三方接口重試機制分析圖:

重試機制調用第三方接口 @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("郵件消費者獲取生產者消息msg:" + msg + ",消息id:" + messageId); // 重試機制都是間隔性
 JSONObject jsonObject = JSONObject.parseObject(msg); String email = jsonObject.getString("email"); String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; System.out.println("郵件消費者開始調用第三方郵件服務器,emailUrl:" + emailUrl); JSONObject result = HttpClientUtils.httpGet(emailUrl); // 如果調用第三方郵件接口無法訪問,如何實現自動重試.
if (result == null) { throw new Exception("調用第三方郵件服務器接口失敗!"); } System.out.println("郵件消費者結束調用第三方郵件服務器成功,result:" + result + "程序執行結束"); // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收
channel.basicAck(deliveryTag, false); } // 默認是自動應答模式
 } 

 12、消費者如果保證消息冪等性,不被重復消費

產生原因:網絡延遲傳輸中,消費出現異常或者是消費延遲消費,會造成MQ進行重試補償,

在重試過程中,可能會造成重復消費。 消費者如何保證消息冪等性,

不被重復消費 解決辦法:

                             ①使用全局MessageID判斷消費方使用同一個,解決冪等性。

                             ②或者使用業務邏輯保證唯一(比如訂單號碼)

基於全局消息id區分消息,解決冪等性(重復消費)

生產者:

  @RequestMapping("/send") public String send(){ String msg = "my_fanout_msg:" + System.currentTimeMillis(); //設置全局ID
        Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); fanoutProducer.send(message); return null; }

 消費者

    /** * 郵箱 使用全局ID * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(Message message){ System.out.println("出現異常"); String messageId = message.getMessageProperties().getMessageId(); int i = 1/0; System.out.println("郵箱消費者獲取生產者消息msg:" + messageId); }

 yml文件

spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####開啟消費者重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 3000 server: port: 8081

 啟動測試,重試的時候沒有發生變化

13、SpringBoot整合RabbitMQ簽收模式

//郵件隊列
@Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { System.out .println(Thread.currentThread().getName() + ",郵件消費者獲取生產者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // 手動ack
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收
        channel.basicAck(deliveryTag, false); } }

 開啟手動應答

pring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_host listener: simple: retry: ####開啟消費者異常重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 2000 ####開啟手動ack acknowledge-mode: manual 

 14屬性設置

1.TTL(Time To Live)

   1.1消息的過期時間

         有兩種設置方式:
             1) 通過隊列屬性設置消息過期時間所有隊列中的消息超過時間未被消費時,都會過期。    
/** * 設置過期時間 * @return
     */ @Bean("ttlQueue") public Queue ttlQueue(){ Map<String,Object> map = new HashMap<>(); map.put("x-message-ttl", 11000);//隊列中的消息未被消費11s后過期
        return new Queue("ttlQueue",true,false,false,map); }

          2)設置單條消息的過期時間

               在發送消息的時候指定消息屬性。        

        MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的過期屬性,單位ms
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message("這條消息4秒后過期".getBytes(), messageProperties); rabbitTemplate.send(RabbitMQConfig.FANOUT_EMAIL_QUEUE,message); // 隨隊列的過期屬性過期,單位ms
        rabbitTemplate.convertSendAndReceive(RabbitMQConfig.FANOUT_EMAIL_QUEUE, "消息發送");
如果同時指定了 Message TTL 和 Queue TTL,則小的那個時間生效。

 2.死信隊列

2.1概述

死信隊列 聽上去像 消息“死”了     其實也有點這個意思,死信隊列  是 當消息在一個隊列 因為下列原因:

消息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=false消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())

隊列超載變成了 “死信” 后    被重新投遞(publish)到另一個Exchange   該Exchange 就是DLX     然后該Exchange 根據綁定規則 轉發到對應的 隊列上  

監聽該隊列  就可以重新消費,說白了就是 沒有被消費的消息  換個地方重新被消費

生產者   -->  消息 --> 交換機  --> 隊列  --> 變成死信  --> DLX交換機 -->隊列 --> 消費者

 
什么情況下消息會變成死信?
1)消息被消費者拒絕並且未設置重回隊列:(NACK || Reject ) && requeue == false
2)消息過期
3)隊列達到最大長度,超過了 Max length(消息數)或者 Max length bytes
(字節數),最先入隊的消息會被發送到 DLX。

2.2.應用場景

在定義業務隊列的時候,可以考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被發送到該死信隊列上

2.3使用死信交換機

定義業務(普通)隊列的時候指定參數

x-dead-letter-exchange: 用來設置死信后發送的交換機

x-dead-letter-routing-key:用來設置死信的routingKey

死信隊列配置

  生產者

@Component public class FanoutConfig { /** * 定義死信隊列相關信息 */
    public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信隊列 交換機標識符 */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信隊列交換機綁定鍵標識符 */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 郵件隊列
    private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信隊列
    private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交換機
    private String EXCHANGE_NAME = "fanoutExchange"; // 1.定義郵件隊列
 @Bean public Queue fanOutEamilQueue() { // 將普通隊列綁定到死信隊列交換機上
        Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定義短信隊列
 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定義交換機
 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.隊列與交換機綁定郵件隊列
 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.隊列與交換機綁定短信隊列
 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 配置死信隊列 * * @return
     */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }

 

 消費者
@RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("郵件消費者獲取生產者消息msg:" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); Integer timestamp = jsonObject.getInteger("timestamp"); try { int result = 1 / timestamp; System.out.println("result:" + result); // 通知mq服務器刪除該消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); // // 丟棄該消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } @Component public class DeadConsumer { @RabbitListener(queues = "dead_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("死信郵件消費者獲取生產者消息msg:" + msg + ",消息id:" + messageId); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }

 

 消息留轉圖

 

 

六、消息可靠性

RabbitMQ 的可靠性投遞,也就是在使用 RabbitMQ 實現異步通信的時候,消息丟了怎么辦,消息重復消費怎么辦

 在 RabbitMQ 里面提供了很多保證消息可靠投遞的機制,這個也是 RabbitMQ 的一個特性。

在理解消息可靠性投遞的時候,必須明確一個問題,因為效率與可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對消息的收發效率造成影響。所以如果是一

些業務實時一致性要求不是特別高的場合,可以犧牲一些可靠性來換取效率。比如發送通知或者記錄日志的這種場景,如果用戶沒有收到通知,不會造成業務影響,只要再次發送就可以了。

6.1RabbitMq的工作模型

 

 

 使用 RabbitMQ 收發消息的時候,有幾個主要環節:

1 代表消息從生產者發送到 Broker
       生產者把消息發到 Broker 之后,怎么知道自己的消息有沒有被 Broker 成功接 收?
2 代表消息從 Exchange 路由到 Queue
        Exchange 是一個綁定列表,如果消息沒有辦法路由到正確的隊列,會發生什么 事情?應該怎么處理?
3 代表消息在 Queue 中存儲
          隊列是一個獨立運行的服務,有自己的數據庫(Mnesia),它是真正用來存儲消 息的。如果還沒有消費者來消費,那么消息要一直存儲在隊列里面。如果隊列出了問 題,
         消息肯定會丟失。怎么保證消息在隊列穩定地存儲呢?
4 代表消費者訂閱 Queue 並消費消息
       隊列的特性是什么?FIFO。隊列里面的消息是一條一條的投遞的,也就是說,只 有上一條消息被消費者接收以后,才能把這一條消息從數據庫刪掉,繼續投遞下一條
       消息。那么問題來了,Broker 怎么知道消費者已經接收了消息呢?

6.2消息發送到 RabbitMQ 服務器

第一個環節是生產者發送消息到 Broker。可能因為網絡或者 Broker 的問題導致消息
發送失敗,生產者不能確定 Broker 有沒有正確的接收。在 RabbitMQ 里面提供了兩種機制服務端確認機制,也就是在生產者發送消息給
RabbitMQ 的服務端的時候,服務端會通過某種方式返回一個應答,只要生產者收到了
這個應答,就知道消息發送成功了。
第一種是 Transaction(事務)模式,第二種 Confirm(確認)模式。 

 Transaction(事務)模式

事務模式怎么使用呢?
   通過一個 channel.txSelect()的方法把信道設置成事務模式,然后就可以發布消息給 RabbitMQ 了,如果 channel.txCommit();的方法調用成功,就說明事務提交成功,
則消息一定到達了 RabbitMQ 中。如果在事務提交執行之前由於 RabbitMQ 異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執行 channel.txRollback()方法來實現事務回滾。
在事務模式里面,只有收到了服務端的 Commit-OK 的指令,才能提交成功。所以可以解決生產者和服務端確認的問題。但是事務模式有一個缺點,它是阻塞的,一條消
息沒有發送完畢,不能發送下一條消息,它會榨干 RabbitMQ 服務器的性能。所以不建議在生產環境使用。
代碼:
public class TransactionProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立連接
        Connection conn = factory.newConnection(); // 創建消息通道
        Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ"; // 聲明隊列(默認交換機AMQP default,Direct) // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { channel.txSelect(); // 發送消息 // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // int i =1/0;
 channel.txCommit(); System.out.println("消息發送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已經回滾"); } channel.close(); conn.close(); } }
springboot中的設置:
rabbitTemplate.setChannelTransacted(true);

 Confirm(確認)模式

確認模式有三種,一種是普通確認模式。
在生產者這邊通過調用 channel.confirmSelect()方法將信道設置為 Confirm 模式,然后發送消息。一旦消息被投遞到所有匹配的隊列之后,RabbitMQ 就會發送一個確認(Basic.Ack)給生產者, 也就是調用 channel.waitForConfirms()返回 true,這樣生產者就知道消息被服務端接收了。這種發送 1 條確認 1 條的方式消息還不是太高,所以我們還有一種批量確認的方式。
批 量 確 認 , 就 是 在 開 啟 Confirm 模 式 后 , 先 發 送 一 批 消 息 。 只 要channel.waitForConfirmsOrDie();方法沒有拋出異常,就代表消息都被服務端接收了。
批量確認的方式比單條確認的方式效率要高,但是也有兩個問題,第一個就是批量的數量的確定。對於不同的業務,到底發送多少條消息確認一次?數量太少,效率提升
不上去。數量多的話,又會帶來另一個問題,比如我們發 1000 條消息才確認一次,如果前面 999 條消息都被服務端接收了,如果第 1000 條消息被拒絕了,那么前面所有的消息都要重發。
異步確認模式需要添加一個 ConfirmListener,並且用一個 SortedSet 來維護沒有
被確認的消息。Confirm 模式是在 Channel 上開啟的,因為 RabbitTemplate 對 Channel 進行了封裝,叫做 ConfimrCallback。
 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("發送消息失敗:" + cause); throw new RuntimeException("發送異常:" + cause); } } });

 6.3消息在隊列中的存儲

是消息在隊列存儲,如果沒有消費者的話,隊列一直存在在數據庫中。如果 RabbitMQ 的服務或者硬件發生故障,比如系統宕機、重啟、關閉等等,可能會導致內存中的消息丟失,
所以要把消息本身和元數據(隊列、交換機、綁定)都 保存到磁盤。 
解決方案:
1.持久化隊列
 @Bean("durableQueue") public Queue queue(){ return new Queue("durableQueue",true,false,false,new HashMap<>()); }

 2.持久化交換機

 @Bean("fanoutDurableExchange") public FanoutExchange fanoutDurableExchange(){ // exchangeName, durable, exclusive, autoDelete,
        return new FanoutExchange("fanoutDurableExchange",true,false,new HashMap<>()); }

 3.消息持久化

 MessageProperties messageProperties = new MessageProperties(); //消息持久化
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的過期屬性,單位ms
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 隨隊列的過期屬性過期,單位ms
        rabbitTemplate.convertAndSend("faoutEmailQueue","324243232",messageProperties);

 6.4消息投遞到消費者

如果消費者收到消息后沒來得及處理即發生異常,或者處理過程中發生異常,會導致失敗。服務端應該以某種方式得知消費者對消息的接收情況,並決定是否重新投遞
這條消息給其他消費者。RabbitMQ 提供了消費者的消息確認機制(message acknowledgement),消費者可以自動或者手動地發送 ACK 給服務端。
沒有收到 ACK 的消息,消費者斷開連接后,RabbitMQ 會把這條消息發送給其他消費者。如果沒有其他消費者,消費者重啟后會重新消費這條消息,重復執行業務邏輯。
消費者在訂閱隊列時,可以指定 autoAck參數,當 autoAck 等於 false 時,RabbitMQ會等待消費者顯式地回復確認信號后才從隊列中移去消息。
如何設置手動 ACK?
application.properties
spring.rabbitmq.listener.direct.acknowledge-mode=manual 
spring.rabbitmq.listener.simple.acknowledge-mode=manual
注意這三個值的區別:
NONE:自動 ACK
MANUAL: 手動 ACK
AUTO:如果方法未拋出異常,則發送 ack。
當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,
且不重新入隊。當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會
發送 ACK。其他的異常,則消息會被拒絕,且 requeue = true 會重新入隊。
    public class SecondConsumer { @RabbitHandler public void process(String msgContent, Channel channel, Message message) throws IOException { System.out.println("Second Queue received msg : " + msgContent); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }

 

 如果消息無法處理或者消費失敗,也有兩種拒絕的方式,Basic.Reject()拒絕單條,Basic.Nack()批量拒絕。如果 requeue 參數設置為 true,可以把這條消息重新存入隊列,

以便發給下一個消費者(當然,只有一個消費者的時候,這種方式可能會出現無限循環 重復消費的情況。可以投遞到新的隊列中,或者只打印異常日志)。
思考:服務端收到了 ACK 或者 NACK,生產者會知道嗎?即使消費者沒有接收到消息,或者消費時出現異常,生產者也是完全不知情的。
例如,我們寄出去一個快遞,是怎么知道收件人有沒有收到的?因為有物流跟蹤和簽收反饋,所以寄件人可以知道。
在沒有用上電話的年代,我們寄出去一封信,是怎么知道收信人有沒有收到信件?只有收到回信,才知道寄出的信被收到了。
所以,這個是生產者最終確定消費者有沒有消費成功的兩種方式:
1) 消費者收到消息,處理完畢后,調用生產者的 API(思考:是否破壞解耦?)
2) 消費者收到消息,處理完畢后,發送一條響應消息給生產者 

6.5消費者回調

1) 調用生產者 API
例如:提單系統給其他系統發送了碎屏保消息后,其他系統必須在處理完消息后調用提單系統提供的 API,來修改提單系統中數據的狀態。只要 API 沒有被調用,數據狀態沒有被修改,
提單系統就認為下游系統沒有收到這條消息。
2) 發送響應消息給生產者
例如:商業銀行與人民銀行二代支付通信,無論是人行收到了商業銀行的消息, 還是商業銀行收到了人行的消息,都必須發送一條響應消息(叫做回執報文)。

 

 6.6補償機制

如果生產者的 API 就是沒有被調用,也沒有收到消費者的響應消息,怎么辦?不要着急,可能是消費者處理時間太長或者網絡超時。
生產者與消費者之間應該約定一個超時時間,比如 5 分鍾,對於超出這個時間沒有到響應的消息,可以設置一個定時重發的機制,但要發送間隔和控制次數,比如每隔 2分鍾發送一次,最多重發 3 次,否則會造成消息堆積。 重發可以通過消息落庫+定時任務來實現。重發,是否發送一模一樣的消息?
參考:
ATM 機上運行的系統叫 C 端(ATMC),前置系統叫 P 端(ATMC),它接收 ATMC
的消息,再轉發給卡系統或者核心系統。
1)如果客戶存款,沒有收到核心系統的應答,不知道有沒有記賬成功,最多發送 5次存款確認報文,因為已經吞鈔了,所以要保證成功;
2)如果客戶取款,ATMC 未得到應答時,最多發送 5 次存款沖正報文。因為沒有吐鈔,所以要保證失敗。

6.7消息冪等性

如果消費者每一次接收生產者的消息都成功了,只是在響應或者調用 API 的時候出了問題,會不會出現消息的重復處理?例如:存款 100 元,ATM 重發了 5 次,核心系統
一共處理了 6 次,余額增加了 600 元。所以,為了避免相同消息的重復處理,必須要采取一定的措施。RabbitMQ 服務端是沒有這種控制的(同一批的消息有個遞增的 DeliveryTag),
它不知道你是不是就要把一條消息發送兩次,只能在消費端控制。如何避免消息的重復消費?消息出現重復可能會有兩個原因:
1、生產者的問題,生產者重復發送消息,比如在開啟了 Confirm 模式但未收到確認,消費者重復投遞。
2、消費者出了問題,由於消費者未發送 ACK 或者其他原因,消息重復投遞。
3、生產者代碼或者網絡問題。對於重復發送的消息,可以對每一條消息生成一個唯一的業務 ID,通過日志或者消息落庫來做重復控制。 
 

 

 

 

                         

# 表示消息確認方式,none manual 手動ack auto 自動ack ;默認auto
spring
.rabbitmq.listener.simple.acknowledge-mode=auto


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM