官網
RabbitMQ
RabbitMQ Tutorials & 1 Hello World! | 2 Work queues | 3 Publish/Subscribe | 4 Routing | 5 Topics | 6 RPC & RabbitMQ 教程
Installation Guide
1. RabbitMQ安裝
1.1 Window版安裝
a.需要首先安裝安裝Erlang
以及對應版本的rabbitmq-server
,可在此處Installing on Windows下載;
本次安裝版本為rabbitmq-server-3.7.14.exe
及OTP 21.3 Windows 64-bit
;下載完成后一直next即可;可將erl
和rabbitMQ
加入環境變量;
b.安裝完成后啟用rabbitmq_management
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>enable rabbitmq_management
'enable' 不是內部或外部命令,也不是可運行的程序
或批處理文件。
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-2NHH5NJ:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-2NHH5NJ...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>
c.之后啟動RabbitMQ,net start RabbitMQ
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin>rabbitmq-server.bat net start RabbitMQ
"WARNING: Using RABBITMQ_ADVANCED_CONFIG_FILE: C:\Users\EDDY~1.SHE\AppData\Roaming\RabbitMQ\advanced.config"
## ##
## ## RabbitMQ 3.7.14. Copyright (C) 2007-2019 Pivotal Software, Inc.
########## Licensed under the MPL. See https://www.rabbitmq.com/
###### ##
########## Logs: C:/Users/EDDY~1.SHE/AppData/Roaming/RabbitMQ/log/RABBIT~1.LOG
C:/Users/EDDY~1.SHE/AppData/Roaming/RabbitMQ/log/rabbit@DESKTOP-2NHH5NJ_upgrade.log
Starting broker...
completed with 3 plugins.
啟動成功后可打開http://localhost:15672/#/測試地址查看,用戶名密碼默認均為guest
1.2 Linux版安裝
2. 典型應用場景
異步,解耦,削峰
跨系統的異步通信;
系統內的同步變為異步;
基於pub/sub模型的廣播訂閱;
分布式事物的最終一致性解決方案;
3. 基本介紹
3.1 AMQP協議
AMQP協議,跨語言,跨系統,跨平台的協議,Advanced Message Queuing Protocol 一種高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的消息中間件可跨產品,跨語言的消息通信。Erlang中的實現有 RabbitMQ等。
3.2 RabbitMQ的特性
RabbmitMQ使用Erlang語言編寫,使用Mnesia數據庫存儲消息。
1)可靠性(Reliability) RabbitMQ使用一些機制來保證消息傳輸的可靠性,如持久化,傳輸確認,發布確認。
2)靈活的路由(Flexible Routing) 在消息進入隊列之前,通過Exchange來路由消息。對應典型的路由功能,RabbmitMQ已經內置了一些Exchange來支持。針對更復雜的路由功能,可以將多個Exchange綁定在一起,也可以通過插件機制實現自己的Exchange。
3) 消息集群(Clustering) 多個RabbitMQ服務可以組成一個集群,形成一個邏輯Broker。
4) 高可用隊列(Highly Available Queues) 隊列可以在集群機器上進行鏡像,使得在部分節點出現問題的情況下隊列讓然可用。
5) 多種協議(Multi Protocol) RabbitMQ支持多種消息隊列協議,如 AMQP, STOMP, MQTT等。
6) 多語言客戶端(Clients) RabbitMQ幾乎支持所有常用語言,如 Python, Java, Ruby, PHP, C#, JavaScript, Go, Elixir, Objective-C等。
7) 管理界面(Management UI) RabbitMQ提供了一個易用的用戶界面,使得用戶可以監控和管理消息,集群中的節點。
8) 插件機制(Plugin) RabbitMQ提供了許多插件,以實現多方面的擴展,當然也可以編寫自己的插件。
3.3 工作模型
工作流程:
1. 首先生產者將消息發布至交換機Exchange
,綁定路由關鍵字Routing Key
(生產者不會講消息直接放入隊列,就算是不聲明交換機,也是先發布至默認的交換機)
2. 消費者創建通道channel
,隊列queue
,並指定隊列queue
綁定的交換機Exchange
,使用綁定關鍵字Binding Key
來綁定(一個交換機可綁定多個隊列queue,一個交換機綁定一個隊列時,也可以使用多個綁定關鍵字)
3. 當交換機收到消息,根據生產者發送消息時的路由關鍵字routing key
和消費者定義的綁定關鍵字Binding Key
做匹配,匹配合適的話就將消息存入匹配到的隊列中去
4. 消費者從隊列中取走消息
關鍵字:
Broker
RabbitMQ的實體服務器,提供一種傳輸及服務,維護一條從生產者到消費者的傳輸路線,保證消息按指定的方式傳輸。
Exchange
消息交換機,指定消息按照哪種規則路由到哪個隊列Queue。
Queue
消息隊列。消息的載體,每個消息都會被投送到一個或多個隊列中。
Binding
綁定。將Exchange和Queue按照某種路由規則綁定。
Routing Key
路由關鍵字。和 Binding Key對應,按照關鍵字匹配通過Exchange找到對應的Queue。
VHost
虛擬主機。相當於一個小的Broker,小的rabbitMQ服務器,一個Broker可以有一到多個虛擬主機,用於不同用戶的權限分離。一個虛擬主機擁有一組 Exchange, Queue和 Binding。
Producer
消息生產者。將消息投遞到Exchange上,一般是獨立的應用程序。
Consumer
消息消費者。消息的接受者,一般是獨立的應用程序。
Connection
Producer,Broker和Consumer之間的TCP長連接。
Channel
消息通道,也成信道。在客戶端的每個連接里可以建立多個Channel,每個Channel代表一個會話認為。在RabbitMQ Java Client中,channel上定義了大量的編程接口。
3.4 三種主要的交換機
Direct Exchange 直連交換機
使用直連類型的交換機時,發送消息時的路由關鍵字Routing Key
和接收消息時的綁定關鍵字Binding Key
必須完全匹配時,生產者發送的消息才能被對應的消費者所接收到;
如上圖,當生產者發送消息指定路由關鍵字是error的時候,交換機發現第一個和第二個隊列都是以error綁定的,所以隊列1,2都會受到消息;當生產者發送消息指定路由關鍵字是info或者warning時,只有第二個隊列能匹配,消費者C2才能受到消息。
Topic Exchange 主題交換機
路由關鍵字Routing Key
和綁定關鍵字Binding Key
不需要完全匹配,類似於正則表達式符合一定規則匹配到即可接收消息。通配符有兩個:
* 代表匹配一個單詞;
# 代表匹配零到多個單詞;
多個單詞之間用.分隔。
如上圖,當使用路由關鍵字fast.orange.tiger發送消息時,表達式匹配隊列Q1,消費者C1會受到消息;
當使用路由關鍵字fast.orange.rabbit發送消息時,表達式匹配隊列Q1,Q2,消費者C1,C2都會受到消息;
當使用路由關鍵字lazy.blue.rabbit發送消息時,隊列Q2綁定交換機時使用的兩個關鍵字都可以匹配到,但只會受到1條消息,不會多次發送;
Fanout Exchange 廣播交換機
使用廣播類型的交換機時,不需要指定Routing Key
和Binding Key
,生產者將消息發送至該交換機后,所有與之綁定的隊列都能收到消息。
如上圖,當生產者發送一條消息時,不需要指定路由關鍵字,消息都會通過Fanout Exchange發送至隊列1和隊列2,消費者C1,C2都會受到消息。
4. Java API編程
// 聲明交換機
com.rabbitmq.client.Channel#exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable)
// 聲明隊列
com.rabbitmq.client.Channel#queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// 發布消息
com.rabbitmq.client.Channel#basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
參數說明
聲明交換機
exchange
: 交換機名稱
type
: 交換機類型,DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers"); 4種,也可以直接使用字符串表示
durable
: 是否持久化,代表交換機在服務器重啟后是否存在
聲明隊列
String queue
: 隊列名稱
boolean durable
: 是否持久化,代表隊列在服務器重啟后是否存在
boolean exclusive
: 是否排他隊列,如果要聲明一個只有自己可見,其余用戶都不可見的隊列,可以使用排他隊列;該隊列有兩個特性:1)
只有首次聲明它的連接(connection)可見(對connection中的多個channel也可見),2)
會在連接斷開時自動刪除
boolean autoDelete
:是否自動刪除,如果為true,在沒有消費者連接到這個隊列時,會自動刪除
Map<String, Object> arguments
:隊列的其他屬性,例如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority;
發布消息
BasicProperties props,
:14個屬性
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String,Object> headers; // 消息的其他自定義參數
private Integer deliveryMode; // 2持久化,其他 瞬態
private Integer priority; // 消息的優先級
private String correlationId;
private String replyTo; // 回調隊列
private String expiration; // TTL 消息過期時間,單位毫秒
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
...
5. 進階知識
5.1 自動刪除沒人消費的消息
TTL (Time To Live) 可以通過設置隊列的過期時間或者每條消息的過期時間來實現:
// 隊列設置過期時間
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-message-ttl", 60000); //queue msg ttl 6s
channel.queueDeclare("MY_TTL_QUEUE", true, false, false, queueArgs);
...
// 消息設置過期時間
AMQP.BasicProperties msgArgs = new AMQP.BasicProperties().builder()
.expiration("5000") // TTL 5s
.deliveryMode(2) // 消息持久化
.contentEncoding("UTF-8")
.build();
channel.basicPublish("", "MY_TTL_QUEUE", msgArgs, "Hello World".getBytes());
5.2 無法路由的消息,去了哪里
創建隊列的時候可以指定隊列中的消息無法路由或者過期后的去處,轉發到指定的交換機,也就是死信交換機,和死信交換機綁定的隊列為死信隊列。
三種情況消息會進入死信交換機DLX(Dead Letter Exchange):
消息過期;
消費端設置autoAck為false,不使用自動應答而是使用手工應答,並且手工應答的處理是reject或者Nack,且requeue屬性為false,被拒絕的消息不會重新入隊的時候
-- 消費端 autoAck為false
com.rabbitmq.client.Channel#basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback);
...
-- requeue 重新入隊為false
com.rabbitmq.client.Channel#basicReject(long deliveryTag, boolean requeue);
com.rabbitmq.client.Channel#basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
隊列達到最大長度,先入隊的消息會被放至DLX
可以為普通的隊列設置死信交換機,當隊列中的消息變為死信后會進入指定的死信交換機,然后定義我們的死信隊列來接收死信交換機里的內容:
String myDeadLetterExchange = "MY_DLX_EXCHANGE";
String myDeadLetterQueue = "MY_DLX_QUEUE";
Map<String, Object> commonQueueArgs = new HashMap<>();
commonQueueArgs.put("x-message-ttl", 60000);
commonQueueArgs.put("x-max-length", 4); // 隊列最大長度
commonQueueArgs.put("x-expires", "9000"); // TTL過期時間
commonQueueArgs.put("x-dead-letter-exchange", myDeadLetterExchange);
// 聲明普通隊列
channel.queueDeclare("MY_TTL_QUEUE", true, false, false, commonQueueArgs);
// 聲明死信交換機
channel.exchangeDeclare(myDeadLetterExchange, "topic");
// 聲明死信隊列
channel.queueDeclare(myDeadLetterQueue, false, false, false, null);
// 綁定
channel.queueBind(myDeadLetterQueue, myDeadLetterExchange, "#");
5.3 可以讓消息優先得到消費嗎
當隊列中消息堆積時,可以設置隊列和消息的優先級,級別高的消息會優先消費。
設置隊列支持的最大優先級屬性x-max-priority
及消息優先級屬性com.rabbitmq.client.AMQP.BasicProperties#priority,當隊列中消息堆積時,隊列會根據屬性優先級大小進行優先消費,如果消息不會堆積,設置優先級並沒什么用。
// 生產者 發送消息時設置消息優先級
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "test");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2代表持久化
.contentEncoding("UTF-8") // 編碼
.expiration("10000") // TTL,過期時間
.headers(headers) // 自定義屬性
.priority(5) // 消息的優先級,默認為5,配合隊列的 x-max-priority 屬性使用
.messageId(String.valueOf(UUID.randomUUID()))
.build();
// 消費者創建隊列時設置隊列支持的最大優先級
Map<String, Object> map = new HashMap<>();
map.put("x-max-priority", 10); //指定隊列的最大優先級
// 聲明隊列(默認交換機AMQP default,Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
5.4 延遲隊列
當在使用智能產品時,比如要設置1個小時以后發送一條短信,可以使用延遲隊列;消息會立刻進入隊列,但1個小時后才會被消費掉;
Rabbit MQ本身並不支持延遲隊列,可以使用之前提到的消息過期時間TTL加死信隊列DLX來實現。
或者使用插件 rabbitmq-delayed-message-exchange
5.5 RPC
5.6 服務端流控
Flow Control, Rabbit MQ中數據是存儲在磁盤或者內存中的,可以設置內存使用率,來達到限流效果。
RabbitMQ 會在啟動時檢測機器的物理內存數值。默認當 MQ 占用 40% 以上內存時,MQ 會主動拋出一個內存警告並阻塞所有連接(Connections)。可以通過修改 rabbitmq.config 文件來調整內存閾值,默認值是 0.4,如下所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].,默認情況,如果剩余磁盤空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。
注意隊列長度只在消息堆積的情況下有意義,而且會刪除先入隊的消息,不能實現服務端限流。
5.6 消費端限流
當多個消費者C1,C2,C3綁定同一個隊列,隊列中的消息由可以快速處理和處理很慢的類型構成,比如現在隊列中有100條消息,默認會均勻分給C1,C2,C3三個消費者33-34條,假設C1,C2接收到的都是可以很快處理掉的消息,快速的把數據處理完了,而此時C3在處理業務比較復雜的數據類型,處理的很慢,可能處理起來會話很長時間;但此時C1,C2其實已經空閑下來了,造成了資源的不合理利用。
消費端限流可以達到每個消費者只可以同時處理1或N條消息的目的,當前消息沒有處理完,RabbitMQ不會在給當前消費者推送消息,這和 java JUC包下的 java.util.concurrent.Semaphore
很類似,具體代碼實現如下:
// 消費端 autoAck=false 非自動確認消息的前提下,如果一定數目的消息(通過基於consume或者channel設置Qos的值)未被確認前,不進行消費新的消息。
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, consumer);
6. UI管理界面
UI頁面可以用來監控消息,管理交換機隊列等。不論是是由代碼創建的交換機隊列,還是UI界面或者http api方式創建,如果名稱和參數都一樣,都是可以創建的。
當RabbitMQ需要升級時,原有RabbitMQ內的交換機,隊列等重新建立會很麻煩,UI界面提供了export功能,可以將老的導出然后導入新的RabbitMQ內。