RabbitMQ入門教程
當初我學RabbitMQ的時候,第一時間就上GitHub找相應的教程,但是令我很失望的是沒有找到,Spring,Mybatis之類的教程很多,而RabbitMQ的教程幾乎找不到,看的最多的就是朱小廝大佬的博客。后來想着索性自己總結一下吧,有不恰當的地方歡迎小伙伴指出。
這篇文章主要是對着我在GitHub上的源碼解釋的,因此本文並沒有太多的源碼。寫了挺長時間的,為了防止迷路,歡迎大家star和fork
github地址:https://github.com/erlieStar/rabbitmq-examples
前言
我們先來看一下一條消息在RabbitMQ中的流轉過程
圖示的主要流程如下
- 生產者發送消息的時候指定RoutingKey,然后消息被發送到Exchange
- Exchange根據一些列規則將消息路由到指定的隊列中
- 消費者從隊列中消費消息
整個流程主要就4個參與者message,exchange,queue,consumer,我們就來認識一下這4個參與者
Message
消息可以設置一些列屬性,每種屬性的作用可以參考《深入RabbitMQ》一書
屬性名 | 用處 |
---|---|
contentType | 消息體的MIME類型,如application/json |
contentEncoding | 消息的編碼類型,如是否壓縮 |
messageId | 消息的唯一性標識,由應用進行設置 |
correlationId | 一般用作關聯消息的message-id,常用於消息的響應 |
timestamp | 消息的創建時刻,整型,精確到秒 |
expiration | 消息的過期時刻,字符串,但是呈現格式為整型,精確到秒 |
deliveryMode | 消息的持久化類型 ,1為非持久化,2為持久化,性能影響巨大 |
appId | 應用程序的類型和版本號 |
userId | 標識已登錄用戶,極少使用 |
type | 消息類型名稱,完全由應用決定如何使用該字段 |
replyTo | 構建回復消息的私有響應隊列 |
headers | 鍵/值對表,用戶自定義任意的鍵和值 |
priority | 指定隊列中消息的優先級 |
Exchange
接收消息,並根據路由鍵轉發消息到所綁定的隊列,常用的屬性如下
交換機屬性 | 類型 |
---|---|
name | 交換器名稱 |
type | 交換器類型,有如下四種,direct,topic,fanout,headers |
durability | 是否需要持久化,true為持久化。持久化可以將交換器存盤,在服務器重啟的時候不會丟失相關信息 |
autoDelete | 與這個Exchange綁定的Queue或Exchange都與此解綁時,會刪除本交換器 |
internal | 設置是否內置,true為內置。如果是內置交換器,客戶端無法發送消息到這個交換器中,只能通過交換器路由到交換器這種方式 |
argument | 其他一些結構化參數 |
我們最常使用的就是type屬性,下面就詳細解釋type屬性
Fanout Exchange
發送到該交換機的消息都會路由到與該交換機綁定的所有隊列上,可以用來做廣播
不處理路由鍵,只需要簡單的將隊列綁定到交換機上
Fanout交換機轉發消息是最快的
Direct Exchage
把消息路由到BindingKey和RoutingKey完全匹配的隊列中
Topic Exchange
前面說到,direct類型的交換器路由規則是完全匹配RoutingKey和BindingKey。topic和direct類似,也是將消息發送到RoutingKey和BindingKey相匹配的隊列中,只不過可以模糊匹配。
- RoutinKey為一個被“.”號分割的字符串(如com.rabbitmq.client)
- BindingKey和RoutingKey也是“.”號分割的字符串
- BindKey中可以存在兩種特殊字符串“*”和“#”,用於做模糊匹配,其中“*”用於匹配不多不少一個詞,“#”用於匹配多個單詞(包含0個,1個)
BindIngKey | 能夠匹配到的RoutingKey |
---|---|
java.# | java.lang,java.util, java.util.concurrent |
java.* | java.lang,java.util |
*.*.uti | com.javashitang.util,org.spring.util |
假如現在有2個RoutingKey為java.lang和java.util.concurrent的消息,java.lang會被路由到Consumer1和Consumer2,java.util.concurrent會被路由到Consumer2。
Headers Exchange
headers類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送消息內容中的headers屬性進行匹配。headers類型的交換器性能差,不實用,基本上不會使用。
Queue
隊列的常見屬性如下
參數名 | 用處 |
---|---|
queue | 隊列的名稱 |
durable | 是否持久化,true為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關信息 |
exclusive | 設置是否排他,true為排他。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明他它的連接可見,並在連接斷開時自動刪除(即一個隊列只能有一個消費者) |
autoDelete | 設置是否自動刪除,true為自動刪除,自動刪除的前提是,至少一個消費者連接到這個隊列,之后所有與這個連接的消費者都斷開時,才會自動刪除 |
arguments | 設置隊列的其他參數,如x-message-ttl,x-max-length |
arguments中可以設置的隊列的常見參數如下
參數名 | 目的 |
---|---|
x-dead-letter-exchange | 死信交換器 |
x-dead-letter-routing-key | 死信消息的可選路由鍵 |
x-expires | 隊列在指定毫秒數后被刪除 |
x-ha-policy | 創建HA隊列 |
x-ha-nodes | HA隊列的分布節點 |
x-max-length | 隊列的最大消息數 |
x-message-ttl | 毫秒為單位的消息過期時間,隊列級別 |
x-max-priority | 最大優先值為255的隊列優先排序功能 |
rabbitmq-api(rabbitmq api的使用)
chapter_1: 快速開始,手寫一個RabbitMQ的生產者和消費者
chapter_2: 演示了各種exchange的使用
來回顧一下上面說的各種exchange機器路由規則
交換器類型 | 路由規則 |
---|---|
fanout | 發送到該交換機的消息都會路由到與該交換機綁定的所有隊列上,可以用來做廣播 |
direct | 把消息路由到BindingKey和RoutingKey完全匹配的隊列中 |
topic | topic和direct類似,也是將消息發送到RoutingKey和BindingKey相匹配的隊列中,只不過可以模糊匹配 |
headers | 性能差,基本不會使用 |
chapter_3: 拉取消息
消息的獲得方式有2種
-
拉取消息(get message)
-
推送消息(consume message)
那我們應該拉取消息還是推送消息?get是一個輪詢模型,而consumer是一個推送模型。get模型會導致每條消息都會產生與RabbitMQ同步通信的開銷,這一個請求由發送請求幀的客戶端應用程序和發送應答的RabbitMQ組成。所以推送消息,避免拉取
chapter_4: 手動ack
消息的確認方式有2種
- 自動確認(autoAck=true)
- 手動確認(autoAck=false)
消費者在消費消息的時候,可以指定autoAck參數
String basicConsume(String queue, boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ會等待消費者顯示回復確認消息后才從內存(或者磁盤)中移出消息
autoAck=true: RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正的消費了這些消息
手動確認的方法如下,有2個參數
basicAck(long deliveryTag, boolean multiple)
deliveryTag: 用來標識信道中投遞的消息。RabbitMQ 推送消息給Consumer時,會附帶一個deliveryTag,以便Consumer可以在消息確認時告訴RabbitMQ到底是哪條消息被確認了。
RabbitMQ保證在每個信道中,每條消息的deliveryTag從1開始遞增
multiple=true: 消息id<=deliveryTag的消息,都會被確認
myltiple=false: 消息id=deliveryTag的消息,都會被確認
消息一直不確認會發生啥?
如果隊列中的消息發送到消費者后,消費者不對消息進行確認,那么消息會一直留在隊列中,直到確認才會刪除。
如果發送到A消費者的消息一直不確認,只有等到A消費者與rabbitmq的連接中斷,rabbitmq才會考慮將A消費者未確認的消息重新投遞給另一個消費者
chapter_5: 拒絕消息的兩種方式
確認消息只有一種方法
- basicAck(long deliveryTag, boolean multiple)
而拒絕消息有兩種方式
-
basicNack(long deliveryTag, boolean multiple, boolean requeue)
-
basicReject(long deliveryTag, boolean requeue)
basicNack和basicReject的區別只有一個,basicNack支持批量拒絕
deliveryTag和multiple參數前面已經說過。
requeue=true: 消息會被再次發送到隊列中
requeue=false: 消息會被直接丟失
chapter_6: 失敗通知
chapter_6到chapter_10主要簡述了消息發布時的權衡
我們最常用的就是失敗通知和發布者確認
當消息不能被路由到某個queue時,我們如何獲取到不能正確路由的消息呢?
- 在發送消息時設置mandatory為true
- 生產者可以通過調用channel.addReturnListener來添加ReturnListener監聽器獲取沒有被路由到隊列中的消息
mandatory是channel.basicPublish()方法中的參數
mandatory=true: 交換器無法根據路由鍵找到一個符合條件的隊列,那么RabbitMQ會調用Basic.Return命令將消息返回給生產者
mandatory=false: 出現上述情形,則消息直接被丟棄
chapter_7: 發布者確認
當消息被發送后,消息到底有沒有到達exchange呢?默認情況下生產者是不知道消息有沒有到達exchange
RabbitMQ針對這個問題,提供了兩種解決方式
- 事務(后面會講到)
- 發布者確認(publisher confirm)
而發布者確認有三種編程方式
- 普通confirm模式:每發送一條消息后,調用waitForConfirms()方法,等待服務器端confirm。實際上是一種串行confirm了。
- 批量confirm模式:每發送一批消息后,調用waitForConfirms()方法,等待服務器端confirm。
- 異步confirm模式:提供一個回調方法,服務端confirm了一條或者多條消息后Client端會回調這個方法。
異步confirm模式的性能最高,因此經常使用,我想把這個分享的細一下
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("handleAck, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("handleNack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
});
寫過異步confirm代碼的小伙伴應該對這段代碼不陌生,可以看到這里也有deliveryTag和multiple。但是我要說的是這里的deliveryTag和multiple和消息的ack沒有一點關系。
confirmListener中的ack: rabbitmq控制的,用來確認消息是否到達exchange
消息的ack: 上面說到可以自動確認,也可以手動確認,用來確認queue中的消息是否被consumer消費
chapter_8: 備用交換器
生產者在發送消息的時候如果不設置 mandatory 參數那么消息在未被路由到queue的情況下將會丟失,如果設置了 mandatory 參數,那么需要添加 ReturnListener 的編程邏輯,生產者的代碼將變得復雜。如果既不想復雜化生產者的編程邏輯,又不想消息丟失,那么可以使用備用交換器,這樣可以將未被路由到queue的消息存儲在RabbitMQ 中,在需要的時候去處理這些消息
chapter_9: 事務
RabbitMQ中與事務機制相關的方法有3個
方法 | 解釋 |
---|---|
channel.txSelect() | 將當前的信道設置成事務模式 |
channel.txCommit() | 提交事務 |
channel.txRollback() | 回滾事務 |
消息成功被發送到RabbitMQ的exchange上,事務才能提交成功,否則便可在捕獲異常之后進行事務回滾,與此同時可以進行消息重發
因為事務會榨干RabbitMQ的性能,所以一般使用發布者確認代替事務
chapter_10: 消息持久化
消息做持久化,只需要將消息屬性的delivery-mode設置為2即可
RabbitMQ給我們封裝了這個屬性,即MessageProperties.PERSISTENT_TEXT_PLAIN,
詳細使用可以參考github的代碼
當我們想做消息的持久化時,最好同時設置隊列和消息的持久化,因為只設置隊列的持久化,重啟之后消息會丟失。只設置隊列的持久化,重啟后隊列消失,繼而消息也丟失
chapter_11: 死信隊列
DLX,全稱為Dead-Letter-Exchange,稱之為死信交換器。當一個消息在隊列中變成死信(dead message)之后,它能被重新發送到另一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱之為死信隊列。
DLX也是一個正常的交換器,和一般的交換器沒有區別,實際上就是設置某個隊列的屬性
消息變成死信一般是由於以下幾種情況
- 消息被拒絕(Basic.Reject/Basic.Nack)且不重新投遞(requeue=false)
- 消息過期
- 隊列達到最大長度
死信交換器和備用交換器的區別
備用交換器: 1.消息無法路由時轉到備用交換器 2.備用交換器是在聲明主交換器的時候定義的
死信交換器: 1.消息已經到達隊列,但是被消費者拒絕等的消息會轉到死信交換器。2.死信交換器是在聲明隊列的時候定義的
chapter_12: 流量控制(服務質量保證)
qos即服務端限流,qos對於拉模式的消費方式無效
使用qos只要進行如下2個步驟即可
-
autoAck設置為false(autoAck=true的時候不生效)
-
調用basicConsume方法前先調用basicQos方法,這個方法有3個參數
basicQos(int prefetchSize, int prefetchCount, boolean global)
參數名 | 含義 |
---|---|
prefetchSize | 批量取的消息的總大小,0為不限制 |
prefetchCount | 消費完prefetchCount條(prefetchCount條消息被ack)才再次推送 |
global | global為true表示對channel進行限制,否則對每個消費者進行限制,因為一個channel允許有多個消費者 |
為什么要使用qos?
-
提高服務穩定性。假設消費端有一段時間不可用,導致隊列中有上萬條未處理的消息,如果開啟客戶端,
巨量的消息推送過來,可能會導致消費端變卡,也有可能直接不可用,所以服務端限流很重要 -
提高吞吐量。當隊列有多個消費者時,隊列收到的消息以輪詢的方式發送給消費者。但由於機器性能等的原因,每個消費者的消費能力不一樣,
這就會導致一些消費者處理完了消費的消息,而另一些則還堆積了一些消息,會造成整體應用吞吐量的下降
springboot-rabbitmq(springboot整合rabbitmq)
未完待續
聯系我
email: erlie139@gmail.com
歡迎大家和我交流,關注公眾號Java識堂獲取我的聯系方式