一:MQ的相關概念
MQ(message queue),從字面意思上看,本質是個隊列,FIFO 先入先出,只不過隊列中存放的內容是message 而已,還是一種跨進程的通信機制,用於上下游傳遞消息。在互聯網架構中,MQ 是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務。使用了 MQ 之后,消息發送上游只需要依賴 MQ,不用依賴其他服務。
關於文章全部示例代碼:RabbitMQ_Study
1:為什么使用MQ
①:流量消峰
如果訂單系統最多能處理10000次/s的訂單,這個處理能力應付正常時段下單綽綽有余,正常時段我們下單一秒后就能返回結果。但是在高峰期,如果有兩萬次下單操作系統是處理不了的(服務處理慢不說,有可能會把響應方的服務搞宕機),但是我們能限制訂單超過一萬后不允許用戶下單。假設使用消息隊列做緩沖,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這時有些用戶可能在下單十幾秒后才能收到下單成功的操作,但是比不能下單的體驗要好。
②:應用解耦
以電商應用為例,應用中有訂單系統、庫存系統、物流系統、支付系統。用戶創建訂單后,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障,都會造成下單操作異常。當轉變成基於消息隊列的方式后,系統間調用的問題會減少很多,比如物流系統因為發生故障,需要幾分鍾來修復。在這幾分鍾的時間里,物流系統要處理的請求信息被緩存在消息隊列中,用戶的下單操作可以正常完成。當物流系統恢復后,繼續處理訂單信息即可,終端用戶感受不到物流系統的故障,提升系統的可用性。
③:異步處理
有些服務間調用是異步的,例如 A 調用 B,B 需要花費很長時間執行,但是 A 需要知道 B 什么時候可以執行完,以前一般有兩種方式,A 過一段時間去調用 B 的查詢 api 查詢。或者 A 提供一個 callback api, B 執行完之后調用 api 通知 A 服務。這兩種方式都不是很優雅,使用消息總線,可以很方便解決這個問題,A 調用 B 服務后,只需要監聽 B 處理完成的消息,當 B 處理完成后,會發送一條消息給 MQ,MQ 會將此消息轉發給 A 服務。這樣 A 服務既不用循環調用 B 的查詢 api,也不用提供 callback api。同樣 B 服務也不用做這些操作。A 服務還能及時的得到異步處理成功的消息。
2:RabbitMQ概念
RabbitMQ 是一個在AMQP基礎上實現的,可復用的企業消息系統。它可以用於大型軟件系統各個模塊之間的高效通信,支持高並發,支持可擴展。
你可以把它當做一個快遞站點,當你要發送一個包裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那里,按照這種邏輯 RabbitMQ 是一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ 與快遞站的主要區別在於,它不處理快件而是接收,存儲和轉發消息數據。
AMQP:即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。
消息隊列:MQ 全稱為Message Queue, 消息隊列。是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。
3:RabbitMQ四大核心
①:生產者(Producer)
產生數據發送消息的程序是生產者
②:交換機(Exchange)
交換機是 RabbitMQ 非常重要的一個部件,一方面它接收來自生產者的消息,另一方面它將消息推送到隊列中。交換機必須確切知道如何
處理它接收到的消息,是將這些消息推送到特定隊列還是推送到多個隊列,亦或者是把消息丟棄,這個得有交換機類型決定
③:隊列(Queue)
隊列是 RabbitMQ 內部使用的一種數據結構,盡管消息流經 RabbitMQ 和應用程序,但它們只能存儲在隊列中。隊列僅受主機的內存和
磁盤限制的約束,本質上是一個大的消息緩沖區。許多生產者可以將消息發送到一個隊列,許多消費者可以嘗試從一個隊列接收數據。這就是
我們使用隊列的方式
④:消費者(Consumer)
消費與接收具有相似的含義。消費者大多時候是一個等待接收消息的程序。請注意生產者,消費者和消息中間件很多時候並不在同一機器上。
同一個應用程序既可以是生產者又是可以是消費者。
4:RabbitMQ流程介紹
Broker: 接收和分發消息的應用,RabbitMQ Server就是Message Broker;簡單來說就是消息隊列服務器實體Virtual host:出於多租戶和
安全因素設計的,把AMQP的基本組件划分到一個虛擬的分組中,類似於網絡中的namespace概念。當多個不同的用戶使用同一個
RabbitMQ server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange/queue等 Connection: publisher/consumer和broker之間的TCP連接 Channel: 消息通道,如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCPConnection的開銷將是巨大的,效率也較
低。Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP
method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的
Connection極大減少了操作系統建立TCP connection的開銷。 Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。
常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast) Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列;消息最終被送到這里等待consumer取走 Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞。 Binding: exchange和queue之間的虛擬連接,binding中可以包含routing key,Binding信息被保存到exchange中的查詢表中,用於message
的分發依據;它的作用就是把exchange和queue按照路由規則綁定起來。 producer: 消息生產者,就是投遞消息的程序。 consumer: 消息消費者,就是接受消息的程序。
二:Linux中安裝RabbitMQ
這里我以RabbitMQ 3.9 的版本來進行本文的講解,這里我們需要准備幾個文件
RabbitMQ 3.9.13 Erlang 23.3.4.11(版本兼容) rabbitmq_delayed_message_exchange-3.8.0.ez
1:安裝ErLang和RabbitMQ
# 安裝 erlang 環境 rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm # 安裝 socat 環境 yum -y install socat # 安裝 RabbitMQ 服務 rpm -ivh rabbitmq-server-3.9.13-1.el7.noarch.rpm # 檢查是否安裝 yum list | grep rabbitmq yum list | grep erlang
注:socat支持多協議,用於協議處理,端口轉發,rabbitmq依賴於socat,因此在安裝rabbitmq前要安裝socat。由於默認的CentOS-Base.repo源中沒有socat,所以 $ yum install socat會出現以下錯誤:No package socat available
2:啟動RabbitMQ和防火牆關閉
補充命令: # 查看所以的已開啟的端口 firewall-cmd --zone=public --list-ports # 開啟15672端口(--permanent代表永久生效,重啟系統不會失效) firewall-cmd --zone=public --add-port=15672/tcp --permanent # 防火牆關閉 systemctl stop firewalld # 啟動RabbitMQ服務 systemctl start rabbitmq-server 或 /sbin/service rabbitmq-server start 或 service rabbitmq-server start # 添加開機啟動RabbitMQ服務 chkconfig rabbitmq-server on # 開啟web管理接口(可以更方便快速的對RabbitMQ進行操作) rabbitmq-plugins enable rabbitmq_management # 停止RabbitMQ服務 systemctl stop rabbitmq-server 或 /sbin/service rabbitmq-server stop 或 service rabbitmq-server stop
注:web管理接口應用的操作
rabbitmqctl stop_app 停止web頁面
rabbitmqctl start_app 啟動web頁面
3:RabbitMQ其它版本安裝
若大家用的是最新版本安裝可能不太一樣,請前往:RabbitMQ安裝
3:RabbitMQ基本命令使用及用戶創建

關於RabbitMQ中的用戶角色【tags】 其他(none): 不能登錄管理控制台(啟用management plugin的情況下,以下相同) 普通管理者(management): 用戶可以通過AMQP做的任何事外加以下權限 列出自己可以通過AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和關閉自己的channels 和 connections 查看有關自己的virtual hosts的“全局”的統計信息,包含其他用戶在這些virtual hosts中的活動 決策制定者(policymaker): management的權限外加以下權限 查看、創建和刪除自己的virtual hosts所屬的policies和parameters 監控者(monitor/monitoring): management的權限外加以下權限 列出所有virtual hosts,包括他們不能登錄的virtual hosts 查看其他用戶的connections和channels 查看節點級別的數據如clustering和memory使用情況 查看真正的關於所有virtual hosts的全局的統計信息 超級管理員(administrator): policymaker和monitoring的權限外加以下權限 創建和刪除virtual hosts 查看、創建和刪除users 查看創建和刪除permissions 關閉其他用戶的connections

# 查看RabbitMQ里的所有用戶 rabbitmqctl list_users # 查看默認guest用戶的權限 rabbitmqctl list_user_permissions {username} 【RabbitMQ中的用戶管理】 rabbitmqctl add_user {username} {password} # 該命令將創建一個 non-administrative 用戶 rabbitmqctl delete_user {username} # 表示刪除一個用戶,該命令將指示RabbitMQ broker去刪除指定的用戶 rabbitmqctl change_password {username} {newpassword} # 表示修改指定的用戶的密碼 rabbitmqctl clear_password {username} # 表示清除指定用戶的密碼 # 執行此操作后的用戶,將不能用密碼登錄,但是可能通過已經配置的SASL EXTERNAL的方式登錄。 rabbitmqctl authenticate_user {username} {password} # 表示指引RabbitMQ broker認證該用戶和密碼 rabbitmqctl set_user_tags {username} {tag ...} # 表示設置用戶的角色,{tag}可以是零個,一個,或者是多個。並且已經存在的tag也將會被移除 【RabbitMQ中的權限控制】 在上面我們添加完相關的用戶后,就可以對其用戶分配相關vhost的權限了。 vhost對於Rabbit就像虛擬機之於物理服務器一樣,它們通過在各個實例間提供邏輯上分離, 允許你為不同的應用程序安全保密地運行數據。 而在RabbitMQ中相應的權限分為讀、寫、配置三部分: 讀:有關消費消息的任何操作,包括“清除”整個隊列(同樣需要綁定操作的成功) 寫:發布消息(同樣需要綁定操作的成功) 配置:隊列和交換機的創建和刪除 知道了RabbitMQ權限相關的配置后,我們就可以根據具體情況來配置相應的信息。 RabbitMQ的權限是以vhost為分隔的,我們需要確定一個vhost來確定相關的權限設置,默認的vhost是“/” rabbitmqctl add_vhost {vhost} # {vhost} 表示待創建的虛擬主機項的名稱 rabbitmqctl delete_vhost {vhost} # 表示刪除一個vhost。刪除一個vhost將會刪除該vhost的所有exchange、queue、binding、用戶權限、參數和策略。 rabbitmqctl list_vhosts {vhostinfoitem ...} # 表示列出所有的vhost。其中 {vhostinfoitem} 表示要展示的vhost的字段信息,展示的結果將按照{vhostinfoitem}指定的字段 # 順序展示。這些字段包括: name(名稱) 和 tracing (是否為此vhost啟動跟蹤)。 # 如果沒有指定具體的字段項,那么將展示vhost的名稱。 rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read} # 表示設置用戶權限。 {vhost} 表示待授權用戶訪問的vhost名稱,默認為 "/"; {user} 表示待授權反問特定vhost的用戶名稱; # {conf}表示待授權用戶的配置權限,是一個匹配資源名稱的正則表達式; {write} 表示待授權用戶的寫權限,是一個匹配資源名稱 # 的正則表達式; {read}表示待授權用戶的讀權限,是一個資源名稱的正則表達式。 # rabbitmqctl set_permissions -p / admin "^mip-.*" ".*" ".*" # 例如上面例子,表示授權給用戶 "admin" 具有所有資源名稱以 "mip-" 開頭的 配置權限;所有資源的寫權限和讀權限。 rabbitmqctl clear_permissions [-p vhost] {username} # 表示設置用戶拒絕訪問指定指定的vhost,vhost默認值為 "/" rabbitmqctl list_permissions [-p vhost] # 表示列出具有權限訪問指定vhost的所有用戶、對vhost中的資源具有的操作權限。默認vhost為 "/"。 # 注意,空字符串表示沒有任何權限。
實際操作說明: rabbitmqctl list_users # 查看RabbitMQ里的所有用戶 rabbitmqctl list_vhosts # 查看RabbitMQ里的所有vhosts rabbitmqctl list_permissions # 查看RabbitMQ里所有用戶的權限 rabbitmqctl list_user_permissions guest # 查看RabbitMQ里guest用戶的權限 rabbitmqctl add_vhost test # 創建的一個虛擬主機項為 test 的名稱 rabbitmqctl add_user admin 123 # 創建一個用戶為admin 密碼為123 rabbitmqctl set_user_tags admin administrator # 設置admin的角色為超級管理員(administrator) rabbitmqctl set_permissions -p test admin ".*" ".*" ".*" # 設置admin在test的vhost中,並設置全部文件的讀寫操作 rabbitmqctl list_permissions -p test # 查看test中的vhost里的用戶
4:卸載RabbitMQ服務
systemctl stop rabbitmq-server # 停止RabbitMQ服務 yum list | grep rabbitmq # 查看RabbitMQ安裝的相關列表 yum -y remove rabbitmq-server.noarch # 卸載RabbitMQ已安裝的相關內容 yum list | grep erlang # 查看erlang安裝的相關列表 yum -y remove erlang-* yum remove erlang.x86_64 # 卸載erlang已安裝的相關內容 rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmq rm -rf /usr/local/erlang rm -rf /usr/local/rabbitmq # 刪除有關的所有文件
三:簡單隊列
本小節將使用Java編寫兩個程序來模擬簡單隊列,用生產者(Producer)發送消息到RabbitMQ隊列后,再由消費者(Consumer)來監控RabbitMQ發送來的隊列信息;簡單隊列就是一個生產者發送消息到隊列,監聽那個隊列的一個消費者獲取消息並處理

<dependencies> <!--RabbitMQ客戶端坐標--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> </dependencies> <build> <plugins> <!--配置maven編譯版本--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source><!--源代碼使用的JDK--> <target>1.8</target><!--target需要生成的目標class文件的編譯版本--> <encoding>UTF-8</encoding><!--字符集編碼,防止中文亂碼--> <failOnError>true</failOnError><!--指示即使存在編譯錯誤,構建是否仍將繼續--> <failOnWarning>false</failOnWarning><!--指示即使存在編譯警告,構建是否仍將繼續--> <showDeprecation>false</showDeprecation><!--設置是否顯示使用不推薦API的源位置--> <showWarnings>false</showWarnings><!--設為true若要顯示編譯警告,請執行以下操作--> <meminitial>128M</meminitial><!--編譯器使用的初始化內存--> <maxmem>512M</maxmem><!--編譯器使用的最大內存--> </configuration> </plugin> </plugins> </build>
1:創建生產者(后面例子以這個為基礎)
package cn.xw.helloWorld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author AnHui OuYang * @version 1.0 * created at 2022-03-04 17:42 */ public class Producer { //簡單隊列名稱 public static final String QUEUE_NAME = "helloWorldQueue"; public static void main(String[] args) throws IOException, TimeoutException { //創建一個連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ服務的IP、賬號、密碼、Vhost虛擬主機(默認 "/" 則不需要設置) factory.setHost("192.168.31.51"); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("test"); //通過工廠對象獲取一個連接 Connection connection = factory.newConnection(); //通過連接來獲取一個信道 Channel channel = connection.createChannel(); //聲明一個隊列 //參數一:隊列名稱 //參數二:隊列里的消息是否持久化,默認消息保存在內存中,默認false //參數三:該隊列是否只供一個消費者進行消費的獨占隊列,則為 true(僅限於此連接),false(默認,可以多個消費者消費) //參數四:是否自動刪除 最后一個消費者斷開連接以后 該隊列是否自動刪除 true 自動刪除,默認false //參數五:構建隊列的其它屬性,看下面擴展參數 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //發送的消息 byte[] msg = "這是一個簡單消息".getBytes(StandardCharsets.UTF_8); //發送消息 //參數一:將發送到RabbitMQ的哪個交換機上 //參數二:路由的key是什么(直接交換機找到路由后,通過路由key來確定最終的隊列) //參數三:其它參數 //參數四:發送到隊列的具體信息 channel.basicPublish("", QUEUE_NAME, null, msg); System.out.println("消息發送完成!"); } }
2:創建消費者(后面例子以這個為基礎)
package cn.xw.helloWorld; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author AnHui OuYang * @version 1.0 * created at 2022-03-05 15:12 */ public class Consumer { //簡單隊列名稱 public static final String QUEUE_NAME = "helloWorldQueue"; public static void main(String[] args) throws IOException, TimeoutException { //創建一個連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ服務的IP、賬號、密碼、Vhost虛擬主機(默認 "/" 則不需要設置) factory.setHost("192.168.31.51"); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("test"); //通過工廠對象獲取一個連接 Connection connection = factory.newConnection(); //通過連接來獲取一個信道 Channel channel = connection.createChannel(); System.out.println("消費者開始監聽隊列消息...."); //推送的消息如何進行消費的接口回調 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("獲取隊列信息:" + new String(message.getBody(), StandardCharsets.UTF_8)); } }; //取消消費的一個回調接口 如在消費的時候隊列被刪除掉了 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); } }; //消費者消費消息 //參數一:消費哪個隊列 //參數二:消費成功之后是否要自動應答 true 代表自動應答 false 手動應答 //參數三:接受隊列消息的回調接口 //參數四:取消消費的回調接口 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
3:測試簡單隊列
編寫好上面的消費者代碼和生產者代碼后我們就可以進行Demo演示了,首先執行生產者發送消息后我們再執行消費者代碼
隨后執行完消費者后會打印具體的隊列消息
注:必須先執行生產者,因為執行消費者后會發現在RabbitMQ中找不到指定Queue隊列,這時就會出現異常;但是為了不報錯也可以在消費者代碼里面也創建隊列,所有,生產者消費者都可以創建隊列
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'helloWorldQueue' in vhost 'test', class-id=60, method-id=20)
4:創建隊列擴展參數
x-dead-letter-exchange:
死信交換器
x-dead-letter-routing-key:
死信消息的可選路由鍵
x-expires:
隊列在指定毫秒數后被刪除
x-message-ttl:
毫秒為單位的消息過期時間,隊列級別
x-ha-policy:
創建HA隊列,此參數已失效
x-ha-nodes:
HA隊列的分布節點,此參數已失效
x-max-length:
隊列的消息條數限制。限制加入queue中消息的條數。先進先出原則,超過后,后面的消息會頂替前面的消息。
x-max-length-bytes:
消息容量限制,該參數和x-max-length目的一樣限制隊列的容量,但是這個是靠隊列大小(bytes)來達到限制。
x-max-priority:
最大優先值為255的隊列優先排序功能
x-overflow:
設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發生什么。
有效值是drop-head、reject-publish或reject-publish-dlx。
x-single-active-consumer:
表示隊列是否是單一活動消費者,true時,注冊的消費組內只有一個消費者消費消息,
其他被忽略,false時消息循環分發給所有消費者(默認false)
x-queue-mode:
將隊列設置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;
如果未設置,隊列將保留內存緩存以盡可能快地傳遞消息
x-queue-master-locator:
在集群模式下設置鏡像隊列的主節點信息
四:工作隊列(Work Queues)
工作隊列(又稱任務隊列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成。相反我們安排任務在之后執行。我們把任務封裝為消息並將其發送到隊列。在后台運行的工作進程將彈出任務並最終執行作業。當有多個工作線程時,這些工作線程將一起處理這些任務。
生產者生產了1萬個消息發送到隊列中,這時為了提高處理效率往往設置了多個消費者同時監聽消息隊列並處理消息
1:抽取工具類(獲取連接信道)

/** * @author AnHui OuYang * @version 1.0 * created at 2022-03-05 16:24 */ public class ChannelUtil { public static Channel getChannel() { //信道初始化 Channel channel = null; try { //創建一個連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ服務的IP、賬號、密碼、Vhost虛擬主機(默認 "/" 則不需要設置) factory.setHost("192.168.31.51"); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("test"); //通過工廠對象獲取一個連接 Connection connection = factory.newConnection(); //通過連接來獲取一個信道 channel = connection.createChannel(); } catch (Exception e) { e.printStackTrace(); } return channel; } }
2:創建生產者
public class ProducerA { //工作隊列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //創建隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //循環發送消息 for (int i = 0; i < 1000; i++) { byte[] msg = ("這是一個編號為:" + i + " 的待處理的消息").getBytes(StandardCharsets.UTF_8); channel.basicPublish("", QUEUE_NAME, null, msg); } System.out.println("消息發送完成!"); } }
3:創建兩個消費者
public class ConsumerA { //工作隊列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //創建隊列 以防啟動消費者發現隊列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("消費者A開始監聽隊列消息...."); //消費者消費消息 channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> { System.out.println("A消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } } public class ConsumerB { //工作隊列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //創建隊列 以防啟動消費者發現隊列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("消費者B開始監聽隊列消息...."); //消費者消費消息 channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> { System.out.println("B消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }
創建完生產者和消費者后首先啟動兩個消費者,然后啟動生產者,生產者發送消息,被兩個消費者監聽並消費
4:輪詢分發消息
在上面的代碼中我們會發現兩個消費者消費消息的順序是輪詢的(A1,B2,A3,B4......);這也是默認的消費規則,但是在日常生產環境中並不會用此模式來進行隊列消息的消費。
五:工作隊列之消息應答
每個消費者服務完成一個任務可能需要的時間長短不一樣,如果其中一個消費者處理一個任務時並僅只完成了部分就突然掛掉了,會發生什么情況。RabbitMQ一旦向消費者傳遞了一條消息,便立即將該消息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續發送給該消費的消息,因為它無法接收到。為了保證消息在發送過程中不丟失,rabbitmq引入消息應答機制,
消息應答就是:消費者在接收到消息並且處理該消息之后,告訴rabbitmq它已經處理了,rabbitmq可以把該消息刪除了
1:自動應答
消息發送后立即被認為已經傳送成功,這種模式需要在高吞吐量和數據傳輸安全性方面做權衡,因為這種模式如果消息在消費者接收到之前,消費者那邊出現連接或者channel關閉,那么消息就丟失了,當然另一方面這種模式在消費者那邊可以傳遞過載的消息,沒有對傳遞的消息數量進行限制,當然這樣有可能使得消費者這邊由於接收太多還來不及處理的消息,導致這些消息的積壓,最終使得內存耗盡,最終這些消費者線程被操作系統殺死,所以這種模式僅適用在消費者可以高效並以某種速率能夠處理這些消息的情況下使用(就是業務處理簡單的消息)。
自動應答:隊列向消費者發送消息后,消費者接收到消息就算成功應答了,隨后隊列將會刪除對應的隊列消息;
2:手動應答(重要)
上面案例全部采用的是自動應答,所以我們要想實現消息消費過程中不丟失,需要把自動應答改為手動應答,這樣確保從消息隊列來一個消息給消費者,等消費者消費完畢以后再告知RabbitMQ已處理完,然后RabbitMQ才會發送下一條消息個消費者處理,保證消息不丟失
注:basicConsume消息接收方法中的autoAck參數必須為false才可以顯示為手動確認
手動應答分為三種情況:
②:手動拒絕 basicReject(long deliveryTag, boolean requeue):
拒絕deliveryTag對應的消息,第二個參數是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列。
③:手動不確認 basicNack(long deliveryTag, boolean multiple, boolean requeue)
不確認deliveryTag對應的消息,第二個參數是否應用於多消息,第三個參數是否requeue,與basic.reject區別就是同時支持多個消息,
可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
③:手動恢復 basicRecover(boolean requeue)
是否恢復消息到隊列,true則重新入隊列,並且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。
false則消息會重新被投遞給自己。
④:手動應答 basicAck(long deliveryTag, boolean multiple)
如果消費者在處理消息的過程中,出了錯,就沒有什么辦法重新處理這條消息,所以在平時都是處理消息成功后,再確認消息;
當autoAck=false時,RabbitMQ會等待消費者手動發回ack信號后,才從內存(和磁盤,如果是持久化消息的話)中移除消息。
它采用消息確認機制,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,
因為RabbitMQ會一直持有消息直到消費者手動調用channel.basicAck為止。對於RabbitMQ服務器端而言,如果服務器端一直沒 有收到消費者的ack信號,並且消費此消息的消費者已經斷開連接,則服務器端會安排該消息重新進入隊列,等待投遞給下一個 消費者(也可能還是原來的那個消費者)。這里我們啟動了手動確認后,就必須調用channel.basicAck方法進行確認,
否則的話RabbitMQ會一直進行等待,當我們這個消費者關閉后,RabbitMQ會將該條消息再發給對應的消費者進行消費,
直到有消費者對該條消息進行消費並應答完成。
參數說明: deliveryTag:對應消息的ID;通過message.getEnvelope().getDeliveryTag()獲取
requeue:是否重新入列,true代表拒絕應答后會重新返回隊列,false則直接刪除或者進入死信隊列
multiple:是否批量應答,true代表批量應答
假設有個隊列依次排列為 1、2、3...10 (1最先出隊,10最后出隊);
當為true,發送1~5消息給消費者處理完都未確認,當到第6時執行應答方法,並且multiple為true,則代表1~6都被被批量應答
當為false,發送1~5消息給消費者處理完都未確認,當到第6時執行應答方法,並且multiple為true,則代表只要6被應答
//生產者只管發任務消息,代碼不變,消費者代碼優化更改以下,多個消費者代碼也和這一樣 public class ConsumerB { //工作隊列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //創建隊列 以防啟動消費者發現隊列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("消費者B開始監聽隊列消息...."); //應答方式 true自動應答 false手動應答(若是手動應答必須設置false) boolean autoAck = false; //消費者消費消息requeue channel.basicConsume(QUEUE_NAME, autoAck, (consumerTag, message) -> { try { //這里我就一句打印語句,沒有復雜邏輯,正常這里有復雜業務 System.out.println("B消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); int i = 1/0; //手動確認應答 不批量應答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); //出現異常手動進行不應答;並且放入隊列中(reject或者使用uack方式都可以,或者本次消息不處理了可以通過recover重新放到隊列) channel.basicReject(message.getEnvelope().getDeliveryTag(), true); } }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }
注:"在手動應答的情況下,如果channel.basicAck收到確認前的代碼有問題,會拋出異常,導致無法進行手動確認,一般消費者也不會連接中斷,那么該消息就一直無法被處理,連被其它消費者處理的機會都沒有,所以一般我們會進行try-catch處理,處理成功則手動確認,失敗或有異常則拒絕。"
六:RabbitMQ持久化
在生產過程中,難免會發生服務器宕機的事情,RabbitMQ也不例外,可能由於某種特殊情況下的異常而導致RabbitMQ宕機從而重啟,那么這個時候對於消息隊列里的數據,包括交換機、隊列以及隊列中存在消息恢復就顯得尤為重要了。RabbitMQ本身帶有持久化機制,包括交換機、隊列以及消息的持久化。持久化的主要機制就是將信息寫入磁盤,當RabbitMQ服務宕機重啟后,從磁盤中讀取存入的持久化信息,恢復數據。
1:交換機持久化(后面介紹交換機)
默認不是持久化的,在服務器重啟之后,交換機會消失。我們在管理台的Exchange頁簽下查看交換機,可以看到使用上述方法聲明的交換機,Features一列是空的,即沒有任何附加屬性。
我們可以看到第三個參數durable,如果為true時則表示要做持久化,當服務重啟時,交換機依然存在,所以使用該方法聲明的交換機是下面這個樣子的:
2:隊列持久化
與交換機的持久化相同,隊列的持久化也是通過durable參數實現的(設置后隊列也會有個D),看一下方法的定義:
queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments) boolean durable:
參數跟交換機方法的參數一樣,true表示做持久化,當RabbitMQ服務重啟時,隊列依然存在
boolean exclusive(補充):
排它隊列。如果一個隊列被聲明為排他隊列,那么這個隊列只能被第一次聲明它的連接所見,並在連接斷開的時候自動刪除。
這里有三點需要說明:
1:排它隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排它隊列
2:如果一個連接已經聲明了一個排它隊列,其它連接是不允許建立同名的排它隊列的,這個與普通隊列不同
3:即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排它隊列都會被自動刪除的,這種隊列適用於一
個客戶端發送讀取消息的應用場景
boolean autoDelete(補充):
自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列
3:消息持久化
消息的持久化是指當消息從交換機發送到隊列之后,被消費者消費之前,服務器突然宕機重啟,消息仍然存在。消息持久化的前提是隊列持久化,假如隊列不是持久化,那么消息的持久化毫無意義。通過如下代碼設置消息的持久化:
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) BasicProperties props設置消息持久化方式: 參數實現類: public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties { private String contentType; // 消息的內容類型,如:text/plain private String contentEncoding; // 消息內容編碼 private Map<String,Object> headers; // 設置消息的header,類型為Map<String,Object> private Integer deliveryMode; // 1(nopersistent)非持久化,2(persistent)持久化 private Integer priority; // 消息的優先級 private String correlationId; // 關聯ID private String replyTo; // 用於指定回復的隊列的名稱 private String expiration; // 消息的失效時間 private String messageId; // 消息ID private Date timestamp; // 消息的時間戳 private String type; // 類型 private String userId; // 用戶ID private String appId; // 應用程序ID private String clusterId; // 集群ID } deliveryMode是設置消息持久化的參數,等於1不設置持久化,等於2設置持久化; 我們平時不會使用BasicProperties類而是使用MessageProperties,通過這個類來獲取具體配置 設置 MessageProperties.PERSISTENT_TEXT_PLAIN 代表: public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);
//也可以通過這種方式設置;發送消息的參數設置 expiration過期時間 deliveryMode 消息持久化方式
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").deliveryMode(2).build();
保證在服務器重啟的時候可以保持不丟失相關信息,重點解決服務器的異常崩潰而導致的消息丟失問題。但是,將所有的消息都設置為持久化,會嚴重影響RabbitMQ的性能,寫入硬盤的速度比寫入內存的速度慢的不只一點點。對於可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐率,在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡。
七:RabbitMQ消息分發
1:不公平分發
在上面的案例中,RabbitMQ 分發消息采用的輪訓分發,但是在某種場景下這種策略並不是很好,比方說有兩個消費者在處理任務,其中有個消費者A處理任務的速度非常快,而另外一個消費者B處理速度卻很慢,這個時候我們還采用輪訓分發的話就會發現消費者A早早的處理完后空閑在那,而消費者B還在處理,這時消費者A等待消費者B處理完任務后A消費者才會得到下一個任務消息;這就會浪費空閑消費者A發服務器資源;但RabbitMQ 並不知道這種情況它依然很公平的進行分發。
為了避免這種情況,我們可以設置參數 channel.basicQos(1);
意思就是說如果消費者對這個任務還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務,然后 rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,當然如果所有的消費者都沒有完成手上任務,隊列還在不停的添加新任務,隊列有可能就會遇到隊列被撐滿的情況,這個時候就只能添加新的 worker(消費者服務)或者改變其它存儲任務的策略。
說好聽點就是不公平分發,其實它叫預取值,后面說明,預取值就是信道中可以允許未確認消息的最大值,如果是1,那處理快的就很快處理完可以處理下一條,慢的還得繼續處理,不接受消息,實現不公平分發。
我們還需要設置手動應答,因為自動應答,會發現雖然實現不公平分發,但是還是一樣的,每個消費者消費的數據量很大可能是一樣的,因為自動應答是一旦發送到消費者代表完成,后續還會繼續給這個消費者發送,但是手動應答則會發現,我消費的慢,會等消費者消費完才會被分配下一個消息處理;所以消費快的消費者會消費更多的消息。
消費者A消費者B代碼改造:
public class ConsumerA { //工作隊列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //創建隊列 以防啟動消費者發現隊列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //設置0代表輪詢分發、1不公平分發、大於1代表預取值 channel.basicQos(1); System.out.println("消費者A(處理資源很快)開始監聽隊列消息...."); //應答方式 true自動應答 false手動應答(若是手動應答必須設置false) boolean autoAck = false; //消費者消費消息requeue channel.basicConsume(QUEUE_NAME, autoAck, (consumerTag, message) -> { try { //這里我就一句打印語句,沒有復雜邏輯,正常這里有復雜業務 System.out.println("A消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); Thread.sleep(3000); //3秒才能處理完一個任務消息 //手動確認應答 不批量應答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException | InterruptedException e) { e.printStackTrace(); //出現異常手動進行不應答;並且放入隊列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), true); } }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } } //消費者A(處理資源很快)開始監聽隊列消息.... //A消費者獲取隊列信息並處理:這是一個編號為:0 的待處理的消息 //A消費者獲取隊列信息並處理:這是一個編號為:2 的待處理的消息 //A消費者獲取隊列信息並處理:這是一個編號為:3 的待處理的消息 //A消費者獲取隊列信息並處理:這是一個編號為:4 的待處理的消息 //A消費者獲取隊列信息並處理:這是一個編號為:6 的待處理的消息 //A消費者獲取隊列信息並處理:這是一個編號為:7 的待處理的消息 //A消費者獲取隊列信息並處理:這是一個編號為:8 的待處理的消息 public class ConsumerB { //工作隊列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //創建隊列 以防啟動消費者發現隊列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //設置0代表輪詢分發、1不公平分發、大於1代表預取值 channel.basicQos(1); System.out.println("消費者B(處理資源很慢)開始監聽隊列消息...."); //應答方式 true自動應答 false手動應答(若是手動應答必須設置false) boolean autoAck = false; //消費者消費消息requeue channel.basicConsume(QUEUE_NAME, autoAck, (consumerTag, message) -> { try { //這里我就一句打印語句,沒有復雜邏輯,正常這里有復雜業務 System.out.println("B消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); Thread.sleep(10000); //10秒才能處理完一個任務消息 //手動確認應答 不批量應答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException | InterruptedException e) { e.printStackTrace(); //出現異常手動進行不應答;並且放入隊列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), true); } }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } } //消費者B(處理資源很慢)開始監聽隊列消息.... //B消費者獲取隊列信息並處理:這是一個編號為:1 的待處理的消息 //B消費者獲取隊列信息並處理:這是一個編號為:5 的待處理的消息 //B消費者獲取隊列信息並處理:這是一個編號為:9 的待處理的消息
總結不公平分發就是,在消費者有收到確認機制后並設置不公平分發就代表哪個消費者先消費完后任務,RabbitMQ隊列會先為它分配下一個任務消息,反之慢的消費者等消費完也可以拿到新消息處理
2:預取值
本身隊列發送給消費者的消息是異步發送的,所以在任何時候,消費者連接隊列時的channel上肯定不止一個消息,另外來自消費者的手動確認本質上也是異步的。因此這里就存在一個未確認的消息緩沖區,因此希望開發人員能限制此緩沖區的大小,以避免緩沖區里面無限制的未確認消息問題。這個時候就可以通過使用basicQos方法設置“預取計數”值來完成的。該值定義通道上允許的未確認消息的最大數量。一旦數量達到配置的數量,RabbitMQ 將停止再往通道上傳遞更多消息,除非至少有一個未處理的消息被確認后RabbitMQ才會再往信道上發送一條任務消息;
假設在通道上有未確認的消息 5、6、7,8,並且通道的預取計數設置為 4,此時 RabbitMQ 將不會再往該通道上再傳遞任何消息,除非至少有一個未應答的消息被ack。比方說tag=6這個消息剛剛被確認ACK,RabbitMQ將會感知這個tag=6被確認並再往信道發送一條消息。
消息應答和 QoS 預取值對用戶吞吐量有重大影響。通常,增加預取將提高向消費者傳遞消息的速度。雖然自動應答傳輸消息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的消息的數量也會增加,從而增加了消費者的RAM消耗(隨機存取存儲器),應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的消息如果沒有確認的話,會導致消費者連接節點的內存消耗變大,所以找到合適的預取值是一個反復試驗的過程,不同的負載該值取值也不同,100 到 300 范圍內的值通常可提供最佳的吞吐量,並且不會給消費者帶來太大的風險。預取值為 1 是最保守的。當然這將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環境中。對於大多數應用來說,稍微高一點的值將是最佳的。
八:發布確認
生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所匹配的隊列之后,broker就會發送一個確認指令給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列設置的是可持久化的,那么向生產者確認消息之前會先將消息寫入磁盤之后再發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域(批量確認),表示當前序列號及這個序列號之前的所有消息都會一並確認。
confirm模式最大的好處在於它是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者便可以通過回調方法來處理該確認消息,如果 RabbitMQ 因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者程序同樣可以在回調方法中處理該 nack 消息。
1:開啟發布確認方法
發布確認默認是沒有開啟的,如果要開啟需要調用方法confirmSelect,每當你要想使用發布確認,都需要在channel上調用該方法
2:單個發布確認
這是一種簡單的確認方式,它是一種同步確認發布的方式,也就是發布者發布一個消息之后只有等RabbitMQ回調確認方法,發布者並且也接受到RabbitMQ的確認時,后續的消息才能繼續發布,waitForConfirmsOrDie(long)這個方法只有在消息被確認的時候才返回,如果在指定時間范圍內這個消息沒有被確認那么它將拋出異常。
這種確認方式有一個最大的缺點就是:發布速度特別的慢,因為如果沒有確認發布的消息就會阻塞所有后續消息的發布,這種方式最多提供每秒不超過數百條發布消息的吞吐量。當然對於某些應用程序來說這可能已經足夠了。
public class ProducerA { //單個發布確認 public static final String SINGLE_RELEASE_CONFIRMATION = "singleReleaseConfirmation";
public static void main(String[] args) throws IOException, InterruptedException { long begin = System.currentTimeMillis(); //記錄開始時間 //獲取信道 Channel channel = ChannelUtil.getChannel(); //創建一個信道 channel.queueDeclare(SINGLE_RELEASE_CONFIRMATION, true, false, false, null); //開啟發布確認功能 channel.confirmSelect(); //循環發送消息 for (int i = 0; i < 1000; i++) { String str = "單個發布確認信息" + i; System.out.println("開始發送信息:" + i); //發布信息 channel.basicPublish("", SINGLE_RELEASE_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); //驗證是否發送成功(等待確認) //channel.waitForConfirms(3000); 發送三秒后沒得到回復將斷定未發送過去 boolean b = channel.waitForConfirms(); if (b) { System.out.println("發送成功了:" + i); } } long end = System.currentTimeMillis(); //記錄結尾時間 System.out.println("單個發布確認用時:" + (end - begin)); //單個發布確認用時:2278 } }
2:批量發布確認
與單個等待確認消息相比,先發布一批消息然后一起確認可以極大的提高吞吐量,當然這種方式的缺點就是:當發生故障導致發布出現問題時,不知道那一批確認的是哪個消息出現問題了,我們必須將整個批處理保存在內存中,以記錄重要的信息而后重新發布消息。當然這種方案仍然是同步的,也一樣阻塞消息的發布。
public class ProducerA { //批量確認 public static final String BATCH_CONFIRMATION = "batchConfirmation";public static void main(String[] args) throws IOException, InterruptedException { long begin = System.currentTimeMillis(); //記錄開始時間 //獲取信道 Channel channel = ChannelUtil.getChannel(); //創建一個信道 channel.queueDeclare(BATCH_CONFIRMATION, true, false, false, null); //開啟發布確認功能 channel.confirmSelect(); //定義每次批量處理多少消息進行確認 int batchNumber = 100; //循環發送消息 for (int i = 0; i < 1000; i++) { String str = "批量發布確認信息" + i; System.out.println("開始發送信息:" + i); //發布信息 channel.basicPublish("", BATCH_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); //驗證是否發送成功(等待確認) 用求余的方式來判斷每輪100個 //channel.waitForConfirms(3000); 發送三秒后沒得到回復將斷定未發送過去 if ((i + 1) % batchNumber == 0) { if (channel.waitForConfirms()) { System.out.println("批量發送成功了 范圍為:" + (i - (batchNumber - 1)) + " ~ " + i); } } } long end = System.currentTimeMillis(); //記錄結尾時間 System.out.println("批量發布確認用時:" + (end - begin)); //批量發布確認用時:454 } }
3:異步確認發布(推薦)
異步確認雖然編程邏輯比上兩個要復雜,但是性價比最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功,下面就讓我們來詳細講解異步確認是怎么實現的。
public class ProducerA { //異步發布確認 public static final String ASYNC_RELEASE_CONFIRMATION = "asyncReleaseConfirmation"; public static void main(String[] args) throws IOException { long begin = System.currentTimeMillis(); //記錄開始時間 //獲取信道 Channel channel = ChannelUtil.getChannel(); //創建一個信道 channel.queueDeclare(ASYNC_RELEASE_CONFIRMATION, true, false, false, null); //開啟發布確認功能 channel.confirmSelect(); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Start //線程安全有序的一個哈希表Map,適用於高並發的情況 //1.輕松的將序號與消息進行關聯 2.輕松批量刪除條目 只要給到序列號 3.支持並發訪問 ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //這個是回調成功的,回調成功后把集合中的數據刪除,最終就代表失敗的多少 if (multiple) { ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = outstandingConfirms.headMap(deliveryTag, true); longStringConcurrentNavigableMap.clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("~~~~ 回調成功的數據:" + deliveryTag + " 是否批量確認:" + multiple); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("~~~~ 回調失敗的數據:" + deliveryTag); }; //添加監聽器,監聽返回(監聽器一定要再發送消息之前就創建和監聽) 參數1:回調成功 參數2:回調失敗 channel.addConfirmListener(ackCallback, nackCallback); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ End //循環發送消息 (因為是異步 這里正常發送不用管它) for (int i = 0; i < 10000; i++) { String str = "異步發布確認信息" + i; //記錄要發送的數據添加到集合中 outstandingConfirms.put(channel.getNextPublishSeqNo(),str); //發布信息 channel.basicPublish("", ASYNC_RELEASE_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); //記錄結尾時間 System.out.println("異步發布確認用時:" + (end - begin)); //異步發布確認用時:337 } }
九:RabbitMQ交換機(Exchange)
交換機(Exchange)接收消息,並根據路由鍵(Routing Key)轉發消息到綁定的隊列
RabbitMQ消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。實際上,通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。相反,生產者只能將消息發送到交換機(exchange),交換機工作的內容非常簡單,一方面它接收來自生產者的消息,另一方面將它們推入隊列。交換機必須確切知道如何處理收到的消息。是應該把這些消息放到特定隊列(需要有提前綁定路由鍵)還是說把它們群發到許多隊列中還是說應該丟棄它們。這就的由交換機的類型來決定。
1:綁定(bindings)
binding其實是exchange和queue之間的橋梁,它告訴我們exchange和那個隊列進行了綁定關系。
2:交換機類型四種類型
直接(direct), 主題(topic) ,標題(headers) , 扇出(fanout)
3:臨時隊列(補充)
每當我們消費者連接到RabbitMQ時,我們都需要一個全新的空隊列(因為這個隊列需要綁定到交換機上),為此我們可以創建一個具有隨機名稱的隊列,或者能讓服務器為我們選擇一個隨機隊列名稱那就更好了。其次一旦我們斷開了消費者的連接,隊列將被自動刪除。
4:無名Exchange
我們沒使用Exchange,但仍能夠將消息發送到隊列。之前能實現的原因是因為我們使用的是默認交換,我們通過空字符串("")進行標識
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
當初使用:channel.basicPublish("", "當初寫隊列名稱", null,"發送的消息");
第一個參數是交換機的名稱:空字符串表示默認或無名稱交換機
消息能路由發送到隊列中其實是由routingKey(binding key)綁定key指定的,那時key都填寫隊列名稱,所有直接被綁定到對應隊列,
可以說使用的是直接交換機(direct)
十:Fanout扇出交換機(發布訂閱模式)
扇出交換機是最基本的交換機類型,它所能做的事情非常簡單---廣播消息。扇出交換機會把能接收到的消息全部發送給綁定在自己身上的隊列。因為廣播不需要“思考”,所以扇形交換機處理消息的速度也是所有的交換機類型里面最快的。
創建交換機方法:exchangeDeclare(String exchange,BuiltinExchangeType type,boolean durable, boolean autoDelete,boolean internal,Map<String, Object> arguments) exchange: 交換機名稱
type: 交換機類型,direct、topic、 fanout、 headers
durable: 是否需要持久化
autoDelete: 當最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange
internal: 當前Exchange是否用於RabbitMQ內部使用,默認為False
arguments: 擴展參數,用於擴展AMQP協議定制化使用
注:推薦在編寫生產者時創建交換機,在編寫消費者時應該創建隊列,並且隊列綁定交換機,啟動時先啟動交換機

public class FanoutConsumerA { //交換機名稱 public static final String FANOUT_DEMO_EXCHANGE = "fanoutDemoExchange"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置發布訂閱模式(扇出模式)防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(FANOUT_DEMO_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null); //創建一個臨時隊列 String queueName = channel.queueDeclare().getQueue(); //把隊列綁定到指定交換機上 channel.queueBind(queueName, FANOUT_DEMO_EXCHANGE, ""); //接收隊列消息 channel.basicConsume(queueName, false, (consumerTag, message) -> { System.out.println("A臨時消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } } public class FanoutConsumerB { //交換機名稱 public static final String FANOUT_DEMO_EXCHANGE = "fanoutDemoExchange"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置發布訂閱模式(扇出模式)防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(FANOUT_DEMO_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null); //創建一個臨時隊列 String queueName = channel.queueDeclare().getQueue(); //把隊列綁定到指定交換機上 channel.queueBind(queueName, FANOUT_DEMO_EXCHANGE, ""); //接收隊列消息 channel.basicConsume(queueName, false, (consumerTag, message) -> { System.out.println("B臨時消費者獲取隊列信息並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }

public class FanoutProducer { //交換機名稱 public static final String FANOUT_DEMO_EXCHANGE = "fanoutDemoExchange"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置發布訂閱模式(扇出模式) channel.exchangeDeclare(FANOUT_DEMO_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null); //循環發送消息 for (int i = 0; i < 10; i++) { String str = "異步發布確認信息" + i; //發布信息 channel.basicPublish(FANOUT_DEMO_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); } System.out.println("消息發送完畢!"); } }
十一:Direct直接交換機(路由模式)
直連交換機是一種帶路由功能的交換機,一個隊列會綁定到一個交換機上,一個交換機身上可以綁定多個隊列;當生產者發送消息給交換機時,交換機會根據binding在交換機上的routing_key來查找路由,最終被送到指定的隊列里;當一個交換機綁定多個隊列,就會被送到對應的隊列去處理。
下面我將以一個案例的方式來使用直接交換機,如下圖:有一個日志交換機(LogExchange),它負責的功能是將生產者發送的日志信息交到對應的隊列中,隊列分別為基本日志隊列(BasicLogQueue)、錯誤隊列(ErrQueue)、通知隊列(NotifyQueue);其中基本日志隊列記錄日常運行日志錯誤隊列記錄重大問題信息,因為錯誤日志需要告知管理員,所有將錯誤日志又發送到通知隊列來發送郵件告知

public class BasicLogConsumer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; //隊列名稱 public static final String BASIC_LOG_QUEUE = "BasicLogQueue"; //路由綁定關系 Routing Key public static final String BASIC_LOG_KEY = "BasicLogKey"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //創建一個基本日志隊列 channel.queueDeclare(BASIC_LOG_QUEUE, true, false, false, null); //隊列綁定到交換機上,並通過路由key來對應兩者的連接 channel.queueBind(BASIC_LOG_QUEUE, LOG_EXCHANGE, BASIC_LOG_KEY); //接收隊列消息 channel.basicConsume(BASIC_LOG_QUEUE, true, (consumerTag, message) -> { System.out.println("基本日志隊列里獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }

public class ErrConsumer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; //隊列名稱 public static final String ERR_QUEUE = "ErrQueue"; //路由綁定關系 Routing Key public static final String ERR_KEY = "ErrKey"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //創建一個基本日志隊列 channel.queueDeclare(ERR_QUEUE, true, false, false, null); //隊列綁定到交換機上,並通過路由key來對應兩者的連接 channel.queueBind(ERR_QUEUE, LOG_EXCHANGE, ERR_KEY); //接收隊列消息 channel.basicConsume(ERR_QUEUE, true, (consumerTag, message) -> { System.out.println("錯誤日志隊列里獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }

public class NotifyConsumer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; //隊列名稱 public static final String NOTIFY_QUEUE = "NotifyQueue"; //路由綁定關系 Routing Key public static final String ERR_KEY = "ErrKey"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //創建一個基本日志隊列 channel.queueDeclare(NOTIFY_QUEUE, true, false, false, null); //隊列綁定到交換機上,並通過路由key來對應兩者的連接 channel.queueBind(NOTIFY_QUEUE, LOG_EXCHANGE, ERR_KEY); //接收隊列消息 channel.basicConsume(NOTIFY_QUEUE, true, (consumerTag, message) -> { System.out.println("接收到錯誤日志並處理任務郵件發送,錯誤日志內容為:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }

public class DirectProducer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //待發送的消息 HashMap<String, List<String>> sendMsg = new HashMap<>(); List<String> errMsg = Arrays.asList("[1001]系統存在重大問題,可能會發生宕機!!", "[1002]電腦受到蠕蟲病毒攻擊!!"); List<String> basicMsg = Arrays.asList("[2001]尊敬的螞蟻小哥歡迎登錄系統", "[2002]螞蟻小哥已退出賬號"); sendMsg.put("ErrKey", errMsg); sendMsg.put("BasicLogKey", basicMsg); //循環發送消息任務 for (Map.Entry<String, List<String>> msg : sendMsg.entrySet()) { String key = msg.getKey();//路由key List<String> messages = msg.getValue();//待發送消息 for (String message : messages) { channel.basicPublish(LOG_EXCHANGE, key, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } } System.out.println("消息發送完成!!!"); } }
從上面可以看出若exchange的綁定類型是direct,但是它綁定的多個隊列的key如果都相同,在這種情況下雖然綁定類型是direct但是它表現的就和fanout有點類似了,就跟廣播差不多。
適用場景:有優先級的任務,根據任務的優先級把消息發送到對應的隊列,這樣可以指派更多的資源去處理高優先級的隊列。
十二:Topics主題交換機(匹配模式)
之前我們使用只能進行隨意廣播的fanout扇出交換機,但只能群發給每個隊列,不能發送到指定某個隊列,但是使用了direct交換機,就可以實現有選擇性地發送到指定隊列了。盡管使用direct交換機,但是它仍然存在局限性,如果我們希望一條消息發送給多個隊列,那么這個交換機需要綁定上非常多的routing_key,假設每個交換機上都綁定一堆的routing_key連接到各個隊列上。那么消息的管理就會異常地困難。所以RabbitMQ提供了一種主題交換機,發送到主題交換機上的消息需要攜帶指定規則的routing_key,主題交換機會根據這個規則將數據發送到對應的(多個)隊列上;可以理解為模糊匹配。
主題交換機的routing_key需要有一定的規則,交換機和隊列的binding_key需要采用*.#.*.....的格式,每個部分用 . 分開。
其中:
* (星號):可以代替一個單詞
#(井號):可以替代零個或多個單詞
上圖是一個隊列綁定關系圖,我們來看看他們之間數據接收情況是怎么樣的:
隊列綁定交換機的Key 匹配規則 quick.orange.rabbit 被隊列 Q1 Q2 接收到
lazy.orange.elephant 被隊列 Q1 Q2 接收到
quick.orange.fox 被隊列 Q1 接收到
lazy.brown.fox 被隊列 Q2 接收到
lazy.pink.rabbit 雖然滿足兩個綁定規則但兩個規則都是在Q2隊列,所有只有Q2接收一次
quick.brown.fox 不匹配任何綁定不會被任何隊列接收到會被丟棄
quick.orange.male.rabbit 是四個單詞不匹配任何綁定會被丟棄
lazy.orange.male.rabbit 是四個單詞但匹配 Q2
注:當一個隊列綁定鍵是#,那么這個隊列將接收所有數據,就有點像fanout扇出交換機了;如果隊列綁定鍵當中沒有#和*出現,那么該隊列綁定類型就是direct直接交換機了;下面我將以上圖的案例來代碼實現:

public class CAConsumer { //交換機名稱 public static final String TOPIC_EXCHANGE = "TopicExchange"; //隊列Q1名稱 public static final String Q1 = "Q1Queue"; //路由綁定關系 Routing Key public static final String Q1_KEY = "*.orange.*"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為主題交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null); //創建一個 Q1 隊列 channel.queueDeclare(Q1, true, false, false, null); //隊列綁定到交換機上,並通過主題路由key來對應兩者的連接 channel.queueBind(Q1, TOPIC_EXCHANGE, Q1_KEY); //接收隊列消息 channel.basicConsume(Q1, true, (consumerTag, message) -> { System.out.println("Q1獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }

public class CBConsumer { //交換機名稱 public static final String TOPIC_EXCHANGE = "TopicExchange"; //隊列Q2名稱 public static final String Q2 = "Q2Queue"; //路由綁定關系 Routing Key 1 public static final String Q2_KEY_A = "*.*.rabbit"; //路由綁定關系 Routing Key 2 public static final String Q2_KEY_B = "lazy.#"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為主題交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null); //創建一個 Q2 隊列 channel.queueDeclare(Q2, true, false, false, null); //隊列綁定到交換機上,並通過主題路由key來對應兩者的連接(這里設置了2個連接) channel.queueBind(Q2, TOPIC_EXCHANGE, Q2_KEY_A); channel.queueBind(Q2, TOPIC_EXCHANGE, Q2_KEY_B); //接收隊列消息 channel.basicConsume(Q2, true, (consumerTag, message) -> { System.out.println("Q2獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }

public class TopicProducer { //交換機名稱 public static final String TOPIC_EXCHANGE = "TopicExchange"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明exchange交換機 並設置為主題交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null); //消息任務准備 HashMap<String, String> sendMsg = new HashMap<>(); sendMsg.put("quick.orange.rabbit", "被隊列 Q1 Q2 接收到"); sendMsg.put("lazy.orange.elephant", "被隊列 Q1 Q2 接收到"); sendMsg.put("quick.orange.fox", "被隊列 Q1 接收到"); sendMsg.put("lazy.brown.fox", "被隊列 Q2 接收到"); sendMsg.put("lazy.pink.rabbit", "雖然滿足兩個綁定規則但兩個規則都是在Q2隊列,所有只要Q2接收一次"); sendMsg.put("quick.brown.fox", "不匹配任何綁定不會被任何隊列接收到會被丟棄"); sendMsg.put("quick.orange.male.rabbit", "是四個單詞不匹配任何綁定會被丟棄"); sendMsg.put("lazy.orange.male.rabbit", "是四個單詞但匹配 Q2"); //循環發送消息任務 for (Map.Entry<String, String> msg : sendMsg.entrySet()) { String routKey = msg.getKey(); //主題路由key String message = msg.getValue();//消息任務 channel.basicPublish(TOPIC_EXCHANGE, routKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } } }
十三:死信隊列
死信隊列(DLX,Dead-Letter-Exchange)就是無法被消費的消息,一般來說producer將消息投遞到broker或者直接到queue里了,consumer從queue取出消息進行消費,但某些時候由於特定的原因導致queue中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信,有死信自然就有了死信隊列。當消息在一個隊列中變成死信后,它能被重新發布到另一個Exchange中,通過Exchange分發到另外的隊列;本質就是該消息不會再被任何消費端消費(但你可以自定義某消費者單獨處理這些死信)。
應用場景:為了保證訂單業務的消息數據不丟失,需要使用到RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中;比如說: 用戶在商城下單成功並點擊去支付后在指定時間未支付時自動失效
“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄。
1:死信隊列(DLX)產生來源
①:消息被拒絕(basic.reject/basic.nack),且 requeue = false(代表不重新回到隊列) ②:消息因TTL過期(就是任務消息上攜帶過期時間) ③:消息隊列的消息數量已經超過最大隊列長度,先入隊的消息會被丟棄變為死信

2:消息TTL過期產生死信
普通消費者代碼:
public class TTLConsumer { //聲明普通的交換機名稱 public static final String NORMAL_EXCHANGE = "NormalExchange"; //聲明死信交換機名稱 public static final String DLX_EXCHANGE = "DLXExchange"; //聲明普通隊列名稱 public static final String Normal_Queue = "NormalQueue"; //聲明死信隊列名稱 public static final String DLX_QUEUE = "DLXQueue"; //聲明路由綁定關系 Routing Key 普通交換機到普通隊列 public static final String NORMAL_KEY = "NormalKey"; //聲明路由綁定關系 Routing Key 死信交換機到死信隊列 public static final String DLX_KEY = "DLXKey"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明普通exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //聲明死信exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //聲明死信隊列 channel.queueDeclare(DLX_QUEUE, true, false, false, null); //死信隊列綁定死信交換機routingKey channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY); //參數設置 Map<String, Object> arguments = new HashMap<>(); //正常隊列設置死信交換機 參數key是固定值 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //正常隊列設置死信交換機到死信隊列綁定Routing Key 參數key是固定值 arguments.put("x-dead-letter-routing-key", DLX_KEY); //聲明普通隊列 channel.queueDeclare(Normal_Queue, true, false, false, arguments); //普通隊列綁定普通交換機routingKey channel.queueBind(Normal_Queue, NORMAL_EXCHANGE, NORMAL_KEY); System.out.println("初始化完成,等待接收消息"); //接收隊列消息 channel.basicConsume(Normal_Queue, true, (consumerTag, message) -> { System.out.println("如同隊列獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }
死信消費者代碼:
public class DLXConsumer { //聲明死信交換機名稱 public static final String DLX_EXCHANGE = "DLXExchange"; //聲明死信隊列名稱 public static final String DLX_QUEUE = "DLXQueue"; //聲明路由綁定關系 Routing Key 死信交換機到死信隊列 public static final String DLX_KEY = "DLXKey"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明死信exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //聲明死信隊列 channel.queueDeclare(DLX_QUEUE, true, false, false, null); //死信隊列綁定死信交換機routingKey channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY); System.out.println("初始化完成,等待接收消息"); //接收隊列消息 channel.basicConsume(DLX_QUEUE, true, (consumerTag, message) -> { System.out.println("死信隊列里獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); }); } }
生產者代碼編寫:
public class DLXProducer { //聲明普通的交換機名稱 public static final String NORMAL_EXCHANGE = "NormalExchange"; //聲明路由綁定關系 Routing Key 普通交換機到普通隊列 public static final String NORMAL_KEY = "NormalKey"; public static void main(String[] args) throws IOException { //調用自己的工具類獲取信道 Channel channel = ChannelUtil.getChannel(); //聲明普通exchange交換機 並設置為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //發送消息的參數設置 expiration過期時間10秒 deliveryMode 消息持久化方式 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").deliveryMode(2).build(); //循環發送消息 for (int i = 0; i < 5; i++) { String str = "測試隊列任務 " + i; //發布信息 channel.basicPublish(NORMAL_EXCHANGE, NORMAL_KEY, properties, str.getBytes(StandardCharsets.UTF_8)); } System.out.println("消息發送完畢!"); } }
編寫好代碼以后執行普通消費者和死信消費者代碼查看創建的隊列及交換機的狀況
測試(為了可以更好的演示效果,關閉普通消費者和死信消費者):
生產者發送5條消息到普通 隊列中,此時普通隊列里面存在10條未消費信息:
消息達到過期時間后會從普通隊列推送到死信隊列里(因為提前設置了消息變死信后發送到死信交換機)
接下來我們就可以啟動死信消費者來消費這一批死信隊列里的任務消息
3:隊列達到最大長度產生死信
代碼優化:剔除生產者代碼中的消息過期時間,並在普通消費者里面設置隊列最大長度
//參數設置 Map<String, Object> arguments = new HashMap<>(); //正常隊列設置死信交換機 參數key是固定值 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //正常隊列設置死信交換機到死信隊列綁定Routing Key 參數key是固定值 arguments.put("x-dead-letter-routing-key", DLX_KEY); //設置正常隊列的長度限制 為3 arguments.put("x-max-length",3);
注:因為隊列參數改變,需要先刪除原隊列,並啟動消費者,創建出帶隊列長度的隊列
4:消息被拒產生死信
代碼優化:剔除普通消費者里面設置隊列最大長度,並優化普通消費者消息接收代碼
//接收隊列消息 channel.basicConsume(Normal_Queue, false, (consumerTag, message) -> { //獲取的任務消息 String msg = new String(message.getBody(), StandardCharsets.UTF_8); //手動不確認,拒收,並丟去隊列 if ("測試隊列任務 3".equals(msg)) { //出現異常手動進行不應答;並且不放入隊列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("如同隊列獲取的任務並處理:" + msg); } }, consumerTag -> { System.out.println("監聽的隊列出現異常;可能隊列被刪除!"); });
十四:延遲隊列
普通隊列:它是一種隊列,隊列意味着內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出。
延時隊列:最重要的特性就體現在它的延時屬性上,跟普通的隊列不一樣的是,普通隊列中的元素總是等着希望被早點取出處理,而延時隊列中的元素則是希望被在指定時間得到取出和處理,所以延時隊列中的元素是都是帶時間屬性的,通常來說是需要被處理的消息或者任務。簡單來說,延時隊列就是用來存放需要在指定時間以后被處理的元素的隊列(到達設置的延遲時間后再推給消費者進行任務處理)。
1:延遲隊列的使用場景
那么什么時候需要用延時隊列呢?考慮一下以下場景:
①:訂單在十分鍾之內未支付則自動取消。
②:新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
③:賬單在一周內未支付,則自動結算。
④:用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
⑤:用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
⑥:預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參加會議。
2:RabbitMQ中的TTL
TTL是RabbitMQ中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。換句話說,如果一條消息設置了TTL屬性或者進入了設置TTL屬性的隊列,那么這條消息如果在TTL設置的時間內沒有被消費,則會成為“死信”。如果同時配置了隊列的TTL和消息的TTL,那么較小的那個值將會被使用。
設置這個TTL值有兩種方式(隊列設置、消息設置):
第一種是在創建隊列的時候設置隊列的 "x-message-ttl" 屬性,如下:
Map<String, Object> arguments = new HashMap<>(); //設置消息延遲10秒;投遞到該隊列的消息超過10秒直接丟棄 arguments.put("x-message-ttl",10000); //創建隊列,並指定參數 channel.queueDeclare(Normal_Queue, true, false, false, arguments); 第二種方式針對每條消息設置TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); //這條消息的過期時間也被設置成了10s , 超過10秒未處理則執行到此消息后被丟棄 builder.expiration("10000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes()); 區別的:第一種在隊列上設置TTL屬性,那么一旦消息過期,就會被隊列丟棄;而第二種方式,消息即使過期,也不一定會被馬上丟棄, 因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間。 另外,還需要注意的一點是,如果不設置TTL,表示消息永遠不會過期,如果將TTL設置為0,則表示除非此時可以直接投遞該消息到消費者,
否則該消息將會被丟棄。
看到這里也代表基本的RabbitMQ已經知道了,下面可以看一看下篇的SpringBoot整合RabbitMQ,下篇有延遲隊列的詳細說明。