概述
本示例程序全部來自rabbitmq官方示例程序,rabbitmq-demo;
官方共有6個demo,針對不同的語言(如 C#,Java,Spring-AMQP等),都有不同的示例程序;
本示例程序主要是Spring-AMQP的參考示例,如果需要其他語言的參考示例,可以參考官網;
rabbitmq模擬器
模擬器
rabbitmq簡介
核心架構圖
AMQP 0-9-1 Model Explained
重要語法說明
- producer或publisher: 消息生產者/發布者,即:產生消息的;
- Exchange:producer或publisher只會將message發送到Exchange,目前有4種不同的Exchange類型;
- Queue:消息隊列,所有的消費者都是直接從Queue獲取Message並消費;
- Binging:連接Exchange和Queue的紐帶,決定Exchange如何路由消息到不同的Queue;
- routingKey:生產者-->message-->Exchange,需要指定一個key,叫做routingKey;
- routingKey:Exchange-->Binging-->Queue,Binging有一個Key值,叫routingKey或bingingKey;
- bingingKey:Exchange-->Binging-->Queue,Binging有一個Key值,bingingKey;
核心理解
4種不同的Exchange,對routingKey的解釋都不相同;
對routingKey的不同解釋,決定了Exchange路由Message到Queue的不同方案;
- direct exchange: 匹配2個routingKey(即routingKey和bingingKey)是否相等,相等時才進行消息路由;
- fanout exchange: 忽略routingKey,會將Message路由到所有綁定的Queue;
- topic exchange: routingKey格式形如
aaa.bbb.xxx
、*.ccc.dd.#
,類似正則表達式匹配; - headers exchange:
jar包說明
- Java版本:
Java版本使用如下jar(說明:若是使用):
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
- Spring-AMQP版本:
Spring AMQP 官方詳細文章
使用Profile配置各個demo的運行選擇,當
使用如下Jar包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
demo1: 單生產者-單消費者
spring.profiles.active=hello-world, sender, receiver
demo2: 單生產者-多消費者
Work queues官方示例
application.properties配置
spring.profiles.active=work-queues, sender, receiver
#spring.profiles.active=work-queues, sender
#spring.profiles.active=work-queues, receiver
詳細描述參見:單生產者-多消費者詳細
demo3: 發布/訂閱
- 消費廣播到多個消費者進行消費;
- 使用fanout pattern;
application.properties配置
spring.profiles.active=pub-sub, receiver , sender
詳細描述參見:發布/訂閱詳細
demo4: Routing
Direct exchange 模式進行route結構圖
a message goes to the queues whose binding key
exactly matches the routing key
of the message;(相等時才路由)
Multiple bindings
兩個Queue使用相同的BingingKey(black) ==> 效果類似於:發布/訂閱模式(demo3);
完整的結構圖
application.properties配置
pring.profiles.active=routing, receiver , sender
詳細描述參見:發布/訂閱詳細
demo5: Topics
- 使用 Topic exchange實現;
- 發送到Topic exchange的routingKey必須滿足一定要求:用"."分割的words列表,如:
*.aaa.bbb.#
; - BingingKey和routingKey有相同的格式要求;
*
: 可以匹配一個word;#
: 可以匹配0個或多個words;
application.properties配置
pring.profiles.active=topics, receiver , sender
詳細描述參見:Topics
demo6: RPC over RabbitMQ
結構圖
application.properties配置
spring.profiles.active=rpc,server
#spring.profiles.active=rpc,client
詳細描述參見:RPC
消費端確認
Delivery Identifiers: Delivery Tags
消費者注冊后,rabbitmq將消息交付給消費者時,都會帶有一個“Delivery Tags”,這個是唯一的ID標識,id以整數的遞增的方式實現。
Acknowledgement Modes(消費端)
自動確認模式
- 發送之后,就認為是發送成功(fire-and-forget)
- 消息不停的發送到消費端消費,無需等待消費端任何確認;
缺點:
- 可能造成消費端不堪重負;
手動模式
- basic.ack: 肯定的確認;
- basic.nack: 否定的確認(RabbitMQ對AMQP 0-9-1的擴展),支持消息
批量確認
; - basic.reject:否定的確認,消息消費失敗后,直接從broker中將消息
delete
,不支持批量確認
;
Acknowledging Multiple Deliveries at Once(消息批量確認)
- 一次確認多個消息發送,而不是每一個消息單獨確認;
- basic.reject:不具備該功能;
- basic.nack: 具備該功能;
實現方式
- multiple field: 設置為true;
示例
假設:在Channel(ch)上有5,6,7,8這4個delivery tags未確認;
- 情況1,
delivery_tag=8 & multiple=true
: 則5,6,7,8這4個tags都將被確認; - 情況2,
delivery_tag=8 & multiple=false
:則只有8被確認,而5,6,7將不會被被確認;
Channel Prefetch Count (QoS)[可以設置消費端消費的速率]
- 消息消費是
異步
完成的,手動確認也是異步
的; - 有一部分消息是被消費了,但是還未來得及確認:
希望控制未被確認消息的size,防止無界的緩存
; prefetch count
:使用basic.qos
方法設置該值可以控制未被確認消息的max size;- 當達到該最大值時,rabbitmq將停止交付消息進行消費;
- 僅對
basic.qos
方法有效,對basic.get
方法無效;
示例
假設:在Channel(Ch)上有5,6,7,8共4個未被確認
的消息,且ch的prefetch count=4
;
結果:rabbitmq將不會再交付任何消息到該Channel上,除非有消息被確認;
消費確認選擇,prefetch設置以及吞吐量
- 情況1:增大
prefetch
:提高向消費者傳遞消息的速度; - 情況2:自動確認模式可以產生最佳的傳送速率;
應避免:
自動確認模式
;手動確認模式
+無限制的prefetch
;
結論:
情況1
和情況2
都可能導致交付但未來得及處理
的Message增加,增大RAM的消耗;
推薦值:
prefetch
: 100~300,可以有效提高吞吐量,並避免RAM消耗過多的風險;
消費失敗或連接中斷: 自動重新reQueue
當消息發送給消費端后,如果出現如下情況,則消息會重新reQueue
,會被再次發送;
- TCP連接中斷;
- 消費端掛掉:無法進行消息確認;
Client Errors: Double Acking and Unknown Tags
消費端無法對同一個消息確認超過一次,當超過一次之后,將拋出Channel error: PRECONDITION_FAILED - unknown delivery tag XXXX
總結
- 每個交付給消費端的消息,都有一個唯一的標識
delivery tag
; - 自動消息確認;
- 手動消息確認:
每個消息單獨確認
和批量消息確認
; prefetchCount
:可以控制消息端的吞吐量,避免消費端消費過慢,產生RAM大量消耗;- 失敗重傳:
TCP連接中斷
或消費端掛掉
,都會引起消息重新入隊列,重新消費(手動消息確認時); - 無法對同一個消息進行2次或2次以上的
確認
,否則會拋出異常;
發送端確認
Channel事務
- 不推薦使用: 會嚴重降低吞吐量;
在 AMQP 0-9-1中,保證消息不丟失的唯一方法,就是使用事務;
- 開啟Channel事務;
- 發送消息,提交事務;
類似消費端的應答確認機制
confirm.select
: 應用於Channel時,表示使用確認模式
;事務
和確認模式
無法共存:二者只能選擇其一;
確認模式 (confirm.select)
- 發送端使用
confirm.select
; broker
發送basic.ack
來確認Message已被處理;delivery-tag
: 消息序列,具有唯一性;multiple=true
: 用於設置批量消息確認
;- 無法保證消息何時被確認;
- 確認模式:消息要么被
confirmed(OK)
,要么被nack(fail)
,且only once;
Java示例:(發送端發送大量messages,使用確認模式)
程序-確認模式
否定確認
異常情況時,服務端無法處理消息,則broker
發送basic.nack
來進行否定確認
;
應答延時和持久化消息
- 僅當消息被持久化到disk之后,才會發送
basic.ack
應答; - 吞吐量提高建議:
異步處理應答
、批量發送消息
;
應答順序
當使用異步發送和持久化消息時,broker對消息的確認順序
可能和發送者的消息發送順序
不一致;
發送確認 + 保證交付
- 消息持久化: 並不能保證消息不丟失(在寫入disk前broker就掛掉);
限制
Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807.Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.