分布式消息通信之RabbitMQ_01


官網

RabbitMQ
RabbitMQ Tutorials & 1 Hello World! | 2 Work queues | 3 Publish/Subscribe | 4 Routing | 5 Topics | 6 RPC & RabbitMQ 教程
Installation Guide

1. RabbitMQ安裝

1.1 Window版安裝

 a.需要首先安裝安裝Erlang以及對應版本的rabbitmq-server,可在此處Installing on Windows下載;
 本次安裝版本為rabbitmq-server-3.7.14.exeOTP 21.3 Windows 64-bit;下載完成后一直next即可;可將erlrabbitMQ加入環境變量;
 b.安裝完成后啟用rabbitmq_management

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>enable rabbitmq_management
'enable' 不是內部或外部命令,也不是可運行的程序
或批處理文件。

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-2NHH5NJ:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-2NHH5NJ...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

set 3 plugins.
Offline change; changes will take effect at broker restart.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>

 c.之后啟動RabbitMQ,net start RabbitMQ

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmq-server.bat net start RabbitMQ
"WARNING: Using RABBITMQ_ADVANCED_CONFIG_FILE: C:\Users\EDDY~1.SHE\AppData\Roaming\RabbitMQ\advanced.config"

  ##  ##
  ##  ##      RabbitMQ 3.7.14. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: C:/Users/EDDY~1.SHE/AppData/Roaming/RabbitMQ/log/RABBIT~1.LOG
                    C:/Users/EDDY~1.SHE/AppData/Roaming/RabbitMQ/log/rabbit@DESKTOP-2NHH5NJ_upgrade.log

              Starting broker...
 completed with 3 plugins.

 啟動成功后可打開http://localhost:15672/#/測試地址查看,用戶名密碼默認均為guest

1.2 Linux版安裝

2. 典型應用場景

 異步,解耦,削峰
  跨系統的異步通信;
 系統內的同步變為異步;
 基於pub/sub模型的廣播訂閱;
 分布式事物的最終一致性解決方案;

3. 基本介紹

3.1 AMQP協議

 AMQP協議,跨語言,跨系統,跨平台的協議,Advanced Message Queuing Protocol 一種高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的消息中間件可跨產品,跨語言的消息通信。Erlang中的實現有 RabbitMQ等。

3.2 RabbitMQ的特性

 RabbmitMQ使用Erlang語言編寫,使用Mnesia數據庫存儲消息。
 1)可靠性(Reliability) RabbitMQ使用一些機制來保證消息傳輸的可靠性,如持久化,傳輸確認,發布確認。
 2)靈活的路由(Flexible Routing) 在消息進入隊列之前,通過Exchange來路由消息。對應典型的路由功能,RabbmitMQ已經內置了一些Exchange來支持。針對更復雜的路由功能,可以將多個Exchange綁定在一起,也可以通過插件機制實現自己的Exchange。
 3) 消息集群(Clustering) 多個RabbitMQ服務可以組成一個集群,形成一個邏輯Broker。
 4) 高可用隊列(Highly Available Queues) 隊列可以在集群機器上進行鏡像,使得在部分節點出現問題的情況下隊列讓然可用。
 5) 多種協議(Multi Protocol) RabbitMQ支持多種消息隊列協議,如 AMQP, STOMP, MQTT等。
 6) 多語言客戶端(Clients) RabbitMQ幾乎支持所有常用語言,如 Python, Java, Ruby, PHP, C#, JavaScript, Go, Elixir, Objective-C等。
 7) 管理界面(Management UI) RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息,集群中的節點。
 8) 插件機制(Plugin) RabbitMQ提供了許多插件,以實現多方面的擴展,當然也可以編寫自己的插件。

3.3 工作模型

RMQ工作模型
工作流程:
 1. 首先生產者將消息發布至交換機Exchange,綁定路由關鍵字Routing Key(生產者不會講消息直接放入隊列,就算是不聲明交換機,也是先發布至默認的交換機)
 2. 消費者創建通道channel,隊列queue,並指定隊列queue綁定的交換機Exchange,使用綁定關鍵字Binding Key來綁定(一個交換機可綁定多個隊列queue,一個交換機綁定一個隊列時,也可以使用多個綁定關鍵字)
 3. 當交換機收到消息,根據生產者發送消息時的路由關鍵字routing key和消費者定義的綁定關鍵字Binding Key做匹配,匹配合適的話就將消息存入匹配到的隊列中去
 4. 消費者從隊列中取走消息

 關鍵字:
Broker RabbitMQ的實體服務器,提供一種傳輸及服務,維護一條從生產者到消費者的傳輸路線,保證消息按指定的方式傳輸。
Exchange 消息交換機,指定消息按照哪種規則路由到哪個隊列Queue。
Queue 消息隊列。消息的載體,每個消息都會被投送到一個或多個隊列中。
Binding 綁定。將Exchange和Queue按照某種路由規則綁定。
Routing Key 路由關鍵字。和 Binding Key對應,按照關鍵字匹配通過Exchange找到對應的Queue。
VHost 虛擬主機。相當於一個小的Broker,小的rabbitMQ服務器,一個Broker可以有一到多個虛擬主機,用於不同用戶的權限分離。一個虛擬主機擁有一組 Exchange, Queue和 Binding。
Producer 消息生產者。將消息投遞到Exchange上,一般是獨立的應用程序。
Consumer 消息消費者。消息的接受者,一般是獨立的應用程序。
Connection Producer,Broker和Consumer之間的TCP長連接。
Channel 消息通道,也成信道。在客戶端的每個連接里可以建立多個Channel,每個Channel代表一個會話認為。在RabbitMQ Java Client中,channel上定義了大量的編程接口。

3.4 三種主要的交換機

Direct Exchange 直連交換機
Direct Exchange
 使用直連類型的交換機時,發送消息時的路由關鍵字Routing Key和接收消息時的綁定關鍵字Binding Key必須完全匹配時,生產者發送的消息才能被對應的消費者所接收到;
 如上圖,當生產者發送消息指定路由關鍵字是error的時候,交換機發現第一個和第二個隊列都是以error綁定的,所以隊列1,2都會受到消息;當生產者發送消息指定路由關鍵字是info或者warning時,只有第二個隊列能匹配,消費者C2才能受到消息。

Topic Exchange 主題交換機
Topic Exchange
 路由關鍵字Routing Key和綁定關鍵字Binding Key不需要完全匹配,類似於正則表達式符合一定規則匹配到即可接收消息。通配符有兩個:

  * 代表匹配一個單詞;
  # 代表匹配零到多個單詞;
  多個單詞之間用.分隔。

 如上圖,當使用路由關鍵字fast.orange.tiger發送消息時,表達式匹配隊列Q1,消費者C1會受到消息;
 當使用路由關鍵字fast.orange.rabbit發送消息時,表達式匹配隊列Q1,Q2,消費者C1,C2都會受到消息;
 當使用路由關鍵字lazy.blue.rabbit發送消息時,隊列Q2綁定交換機時使用的兩個關鍵字都可以匹配到,但只會受到1條消息,不會多次發送;

Fanout Exchange 廣播交換機
Fanout Exchange
 使用廣播類型的交換機時,不需要指定Routing KeyBinding Key,生產者將消息發送至該交換機后,所有與之綁定的隊列都能收到消息。
 如上圖,當生產者發送一條消息時,不需要指定路由關鍵字,消息都會通過Fanout Exchange發送至隊列1和隊列2,消費者C1,C2都會受到消息。

4. Java API編程

Java API基礎操作手冊

            // 聲明交換機
            com.rabbitmq.client.Channel#exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable)
             
            // 聲明隊列
            com.rabbitmq.client.Channel#queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        
           // 發布消息
           com.rabbitmq.client.Channel#basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

 參數說明

聲明交換機

exchange : 交換機名稱
type : 交換機類型,DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers"); 4種,也可以直接使用字符串表示
durable : 是否持久化,代表交換機在服務器重啟后是否存在

聲明隊列

String queue : 隊列名稱
boolean durable : 是否持久化,代表隊列在服務器重啟后是否存在
boolean exclusive : 是否排他隊列,如果要聲明一個只有自己可見,其余用戶都不可見的隊列,可以使用排他隊列;該隊列有兩個特性:1)只有首次聲明它的連接(connection)可見(對connection中的多個channel也可見),2)會在連接斷開時自動刪除
boolean autoDelete :是否自動刪除,如果為true,在沒有消費者連接到這個隊列時,會自動刪除
Map<String, Object> arguments :隊列的其他屬性,例如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority;
RMQ_QUEUE_params

發布消息

BasicProperties props, :14個屬性

  public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;   // 消息的其他自定義參數
        private Integer deliveryMode;    // 2持久化,其他 瞬態 
        private Integer priority;           // 消息的優先級 
        private String correlationId;
        private String replyTo;            // 回調隊列 
        private String expiration;        // TTL 消息過期時間,單位毫秒 
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
      ...

5. 進階知識

5.1 自動刪除沒人消費的消息

 TTL (Time To Live) 可以通過設置隊列的過期時間或者每條消息的過期時間來實現:

            // 隊列設置過期時間
            Map<String, Object> queueArgs = new HashMap<>();
            queueArgs.put("x-message-ttl", 60000);  //queue msg ttl 6s
            channel.queueDeclare("MY_TTL_QUEUE", true, false, false, queueArgs);
           
            ...
            // 消息設置過期時間
            AMQP.BasicProperties msgArgs = new AMQP.BasicProperties().builder()
                    .expiration("5000")  // TTL  5s
                    .deliveryMode(2)   // 消息持久化
                    .contentEncoding("UTF-8")
                    .build();
            channel.basicPublish("", "MY_TTL_QUEUE", msgArgs, "Hello World".getBytes());
               

5.2 無法路由的消息,去了哪里

死信隊列
 創建隊列的時候可以指定隊列中的消息無法路由或者過期后的去處,轉發到指定的交換機,也就是死信交換機,和死信交換機綁定的隊列為死信隊列。
 三種情況消息會進入死信交換機DLX(Dead Letter Exchange):
消息過期;
消費端設置autoAck為false,不使用自動應答而是使用手工應答,並且手工應答的處理是reject或者Nack,且requeue屬性為false,被拒絕的消息不會重新入隊的時候

  -- 消費端 autoAck為false
  com.rabbitmq.client.Channel#basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback);
   ...
  -- requeue 重新入隊為false
  com.rabbitmq.client.Channel#basicReject(long deliveryTag, boolean requeue);
  com.rabbitmq.client.Channel#basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  

隊列達到最大長度,先入隊的消息會被放至DLX
 可以為普通的隊列設置死信交換機,當隊列中的消息變為死信后會進入指定的死信交換機,然后定義我們的死信隊列來接收死信交換機里的內容:

            String myDeadLetterExchange = "MY_DLX_EXCHANGE";
            String myDeadLetterQueue = "MY_DLX_QUEUE";

            Map<String, Object> commonQueueArgs = new HashMap<>();
            commonQueueArgs.put("x-message-ttl", 60000);
            commonQueueArgs.put("x-max-length", 4); // 隊列最大長度
            commonQueueArgs.put("x-expires", "9000");  // TTL過期時間
            commonQueueArgs.put("x-dead-letter-exchange", myDeadLetterExchange);
            // 聲明普通隊列
            channel.queueDeclare("MY_TTL_QUEUE", true, false, false, commonQueueArgs);

            // 聲明死信交換機
            channel.exchangeDeclare(myDeadLetterExchange, "topic");
            // 聲明死信隊列
            channel.queueDeclare(myDeadLetterQueue, false, false, false, null);
            // 綁定
            channel.queueBind(myDeadLetterQueue, myDeadLetterExchange, "#");

5.3 可以讓消息優先得到消費嗎

 當隊列中消息堆積時,可以設置隊列和消息的優先級,級別高的消息會優先消費。
  設置隊列支持的最大優先級屬性x-max-priority及消息優先級屬性com.rabbitmq.client.AMQP.BasicProperties#priority,當隊列中消息堆積時,隊列會根據屬性優先級大小進行優先消費,如果消息不會堆積,設置優先級並沒什么用。

   // 生產者 發送消息時設置消息優先級
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("name", "test");
        headers.put("level", "top");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)   // 2代表持久化
                .contentEncoding("UTF-8")  // 編碼
                .expiration("10000")  // TTL,過期時間
                .headers(headers) // 自定義屬性
                .priority(5) // 消息的優先級,默認為5,配合隊列的 x-max-priority 屬性使用
                .messageId(String.valueOf(UUID.randomUUID()))
                .build();

  // 消費者創建隊列時設置隊列支持的最大優先級
        Map<String, Object> map = new HashMap<>();
        map.put("x-max-priority", 10);  //指定隊列的最大優先級
        // 聲明隊列(默認交換機AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        

5.4 延遲隊列

 當在使用智能產品時,比如要設置1個小時以后發送一條短信,可以使用延遲隊列;消息會立刻進入隊列,但1個小時后才會被消費掉;
 Rabbit MQ本身並不支持延遲隊列,可以使用之前提到的消息過期時間TTL加死信隊列DLX來實現。
 或者使用插件 rabbitmq-delayed-message-exchange

5.5 RPC

RabbitMQ RPC

5.6 服務端流控

 Flow Control, Rabbit MQ中數據是存儲在磁盤或者內存中的,可以設置內存使用率,來達到限流效果。
 RabbitMQ 會在啟動時檢測機器的物理內存數值。默認當 MQ 占用 40% 以上內存時,MQ 會主動拋出一個內存警告並阻塞所有連接(Connections)。可以通過修改 rabbitmq.config 文件來調整內存閾值,默認值是 0.4,如下所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].,默認情況,如果剩余磁盤空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。
 注意隊列長度只在消息堆積的情況下有意義,而且會刪除先入隊的消息,不能實現服務端限流。

5.6 消費端限流

 當多個消費者C1,C2,C3綁定同一個隊列,隊列中的消息由可以快速處理和處理很慢的類型構成,比如現在隊列中有100條消息,默認會均勻分給C1,C2,C3三個消費者33-34條,假設C1,C2接收到的都是可以很快處理掉的消息,快速的把數據處理完了,而此時C3在處理業務比較復雜的數據類型,處理的很慢,可能處理起來會話很長時間;但此時C1,C2其實已經空閑下來了,造成了資源的不合理利用。
 消費端限流可以達到每個消費者只可以同時處理1或N條消息的目的,當前消息沒有處理完,RabbitMQ不會在給當前消費者推送消息,這和 java JUC包下的 java.util.concurrent.Semaphore很類似,具體代碼實現如下:

   // 消費端 autoAck=false 非自動確認消息的前提下,如果一定數目的消息(通過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息。
        channel.basicQos(1);
        channel.basicConsume(QUEUE_NAME, false, consumer);

6. UI管理界面

 UI頁面可以用來監控消息,管理交換機隊列等。不論是是由代碼創建的交換機隊列,還是UI界面或者http api方式創建,如果名稱和參數都一樣,都是可以創建的。
 當RabbitMQ需要升級時,原有RabbitMQ內的交換機,隊列等重新建立會很麻煩,UI界面提供了export功能,可以將老的導出然后導入新的RabbitMQ內。
RMQ export

7. Spring配置方式集成RabbitMQ

8. Spring Boot集成RabbitMQ


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM