過期時間TTL
過期時間TTL表示可以對消息設置預期的時間,在這個時間內都可以被消費者接收獲取;
過了之后消息將自動被刪除。
RabbitMQ可以對消息和隊列設置TTL。目前有兩種方法可以設置。
- 第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
- 第二種方法是對消息進行單獨設置,每條消息TTL可以不同。
如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為准。
消息在隊列的生存時間一旦超過設置的TTL值,就稱為dead message被投遞到死信隊列, 消費者將無法再收到該消息。
設置隊列TTL
//設置相關的配置,也可以在Web界面中設置
package com.zwt.springbootfanout.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class TTLRabbitMqConfiguration {
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttl_direct_exchange", true, true);
}
@Bean
public Queue tttDireclQueue() {
HashMap<String, Integer> map = new HashMap<>();
map.put("x-message-ttl",2000);
return new Queue("ttl.direct.queue",true);
}
@Bean
public Binding ttlDirectBinding() {
return BindingBuilder.bind(tttDireclQueue()).to(ttlDirectExchange()).with("ttl");
}
}
expiration 字段以微秒為單位表示 TTL 值。
且與 x-message-ttl 具有相同的約束條件。
因為 expiration 字段必須為字符串類型,broker 將只會接受以字符串形式表達的數字。
當同時指定了 queue 和 message 的 TTL 值,則兩者中較小的那個才會起作用。
消息確認機制的配置
NONE值是禁用發布確認模式,是默認值
CORRELATED值是發布消息成功到交換器后會觸發回調方法
SIMPLE值經測試有兩種效果
其一效果和CORRELATED值一樣會觸發回調方法,
其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,
根據返回結果來判定下一步的邏輯,
要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker;
死信隊列
DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱。
當消息在一個隊列中變成死信(dead message)之后,它能被重新發送到另一個交換機中,這個交換機就是DLX ,綁定DLX的隊列就稱之為死信隊列。
消息變成死信,可能是由於以下的原因:
- 消息被拒絕
- 消息過期
- 隊列達到最大長度
DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的隊列上被指定,實際上就是設置某一個隊列的屬性。
當這個隊列中存在死信時,Rabbitmq就會自動地將這個消息重新發布到設置的DLX上去,進而被路由到另一個隊列,即死信隊列。
要想使用死信隊列,只需要在定義隊列的時候設置隊列參數 x-dead-letter-exchange
指定交換機即可。

內存磁盤的監控
當內存使用超過配置的閾值或者磁盤空間剩余空間對於配置的閾值時,
RabbitMQ會暫時阻塞客戶端的連接,並且停止接收從客戶端發來的消息,以此避免服務器的崩潰,
客戶端與服務端的心態檢測機制也會失效。
當出現blocking或blocked話說明到達了閾值和以及高負荷運行了。
RabbitMQ的內存控制
參考幫助文檔:https://www.rabbitmq.com/configure.html
當出現警告的時候,可以通過配置去修改和調整。
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 為內存閾值。
默認情況是:0.4/2GB,代表的含義是:當RabbitMQ的內存超過40%時,就會產生警告並且阻塞所有生產者的連接。
通過此命令修改閾值在Broker重啟以后將會失效,通過修改配置文件方式設置的閾值則不會隨着重啟而消失,但修改了配置文件一樣要重啟broker才會生效。
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
配置文件方式 rabbitmq.conf
當前配置文件:/etc/rabbitmq/rabbitmq.conf
#默認
#vm_memory_high_watermark.relative = 0.4
# 使用relative相對值進行設置fraction,建議取值在04~0.7之間,不建議超過0.7.
vm_memory_high_watermark.relative = 0.6
# 使用absolute的絕對值的方式,但是是KB,MB,GB對應的命令如下
vm_memory_high_watermark.absolute = 2GB
RabbitMQ的內存換頁
在某個Broker節點及內存阻塞生產者之前,它會嘗試將隊列中的消息換頁到磁盤以釋放內存空間,
持久化和非持久化的消息都會寫入磁盤中,
其中持久化的消息本身就在磁盤中有一個副本,所以在轉移的過程中持久化的消息會先從內存中清除掉。
默認情況下,內存到達的閾值是50%時就會換頁處理。
也就是說,在默認情況下該內存的閾值是0.4的情況下,當內存超過0.4*0.5=0.2時,會進行換頁動作。
比如有1000MB內存,當內存的使用率達到了400MB,
已經達到了極限,但是因為配置的換頁內存0.5,這個時候會在達到極限400mb之前,會把內存中的200MB進行轉移到磁盤中。從而達到穩健的運行。
可以通過設置 vm_memory_high_watermark_paging_ratio
來進行調整。
RabbitMQ的磁盤預警
當磁盤的剩余空間低於確定的閾值時,RabbitMQ同樣會阻塞生產者,
這樣可以避免因非持久化的消息持續換頁而耗盡磁盤空間導致服務器崩潰。
默認情況下:磁盤預警為50MB的時候會進行預警。
表示當前磁盤空間第50MB的時候會阻塞生產者並且停止內存消息換頁到磁盤的過程。
通過命令方式修改如下:
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit:固定單位 KB MB GB
fraction :是相對閾值,建議范圍在:1.0~2.0之間。(相對於內存)
通過配置文件配置如下:
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb
集群
RabbitMQ 集群
RabbitMQ這款消息隊列中間件產品本身是基於Erlang編寫,Erlang語言天生具備分布式特性(通過同步Erlang集群各節點的magic cookie來實現)。
因此,RabbitMQ天然支持Clustering。
這使得RabbitMQ本身不需要像ActiveMQ、Kafka那樣通過ZooKeeper分別來實現HA方案和保存集群的元數據。
集群是保證可靠性的一種方式,同時可以通過水平擴展以達到增加消息吞吐量能力的目的。
集群搭建
ps aux|grep rabbitmq
systemctl status rabbitmq-server
場景:假設有兩個rabbitmq節點,分別為rabbit-1, rabbit-2,rabbit-1作為主節點,rabbit-2作為從節點。
啟動命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached
結束命令:rabbitmqctl -n rabbit-1 stop
第一步:啟動第一個節點rabbit-1
> sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
...............省略...................
########## Logs: /var/log/rabbitmq/rabbit-1.log
###### ## /var/log/rabbitmq/rabbit-1-sasl.log
##########
Starting broker...
completed with 7 plugins.
啟動第二個節點rabbit-2
注意:web管理插件端口占用,所以還要指定其web插件占用的端口號
RABBITMQ_SERVER_START_ARGS=”-rabbitmq_management listener [{port,15673}]”
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
..............省略..................
########## Logs: /var/log/rabbitmq/rabbit-2.log
###### ## /var/log/rabbitmq/rabbit-2-sasl.log
##########
Starting broker...
completed with 7 plugins.
驗證啟動 “ps aux|grep rabbitmq”
rabbit-1操作作為主節點
#停止應用
> sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清除節點上的歷史數據(如果不清除,無法將節點加入到集群)
> sudo rabbitmqctl -n rabbit-1 reset
#啟動應用
> sudo rabbitmqctl -n rabbit-1 start_app
rabbit2操作為從節點
# 停止應用
> sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除節點上的歷史數據(如果不清除,無法將節點加入到集群)
> sudo rabbitmqctl -n rabbit-2 reset
# 將rabbit2節點加入到rabbit1(主節點)集群當中【Server-node服務器的主機名】
> sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'Server-node'
# 啟動應用
> sudo rabbitmqctl -n rabbit-2 start_app
驗證集群狀態
> sudo rabbitmqctl cluster_status -n rabbit-1
//集群有兩個節點:rabbit-1@Server-node、rabbit-2@Server-node
[{nodes,[{disc,['rabbit-1@Server-node','rabbit-2@Server-node']}]},
{running_nodes,['rabbit-2@Server-node','rabbit-1@Server-node']},
{cluster_name,<<"rabbit-1@Server-node.localdomain">>},
{partitions,[]},
{alarms,[{'rabbit-2@Server-node',[]},{'rabbit-1@Server-node',[]}]}]
注意在訪問的時候:web結面的管理需要給15672 node-1 和15673的node-2 設置用戶名和密碼。
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
Tips:
如果采用多機部署方式,需讀取其中一個節點的cookie, 並復制到其他節點(節點之間通過cookie確定相互是否可通信)。
cookie存放在/var/lib/rabbitmq/.erlang.cookie。
例如:主機名分別為rabbit-1、rabbit-2
1、逐個啟動各節點
2、配置各節點的hosts文件( vim /etc/hosts)
ip1:rabbit-1
ip2:rabbit-2
其它步驟雷同單機部署方式
分布式事務
分布式事務指事務的操作位於不同的節點上,需要保證事務的 AICD 特性。
例如:在下單場景下,庫存和訂單如果不在同一個節點上,就涉及分布式事務。
分布式事務的方式
在分布式系統中,要實現分布式事務,無外乎那幾種解決方案。
兩階段提交(2PC)需要數據庫產商的支持,java組件有atomikos等
兩階段提交(Two-phase Commit,2PC),通過引入協調者(Coordinator)來協調參與者的行為,並最終決定這些參與者是否要真正執行事務。
准備階段
協調者詢問參與者事務是否執行成功,參與者發回事務執行結果。

提交階段
如果事務在每個參與者上都執行成功,事務協調者發送通知讓參與者提交事務;
否則,協調者發送通知讓參與者回滾事務。
需要注意的是,在准備階段,參與者執行了事務,但是還未提交。只有在提交階段接收到協調者發來的通知后,才進行提交或者回滾。
存在的問題
- 同步阻塞 所有事務參與者在等待其它參與者響應的時候都處於同步阻塞狀態,無法進行其它操作。
- 單點問題 協調者在 2PC 中起到非常大的作用,發生故障將會造成很大影響。特別是在階段二發生故障,所有參與者會一直等待狀態,無法完成其它操作。
- 數據不一致 在階段二,如果協調者只發送了部分 Commit 消息,此時網絡發生異常,那么只有部分參與者接收到 Commit 消息,也就是說只有部分參與者提交了事務,使得系統數據不一致。
- 太過保守 任意一個節點失敗就會導致整個事務失敗,沒有完善的容錯機制。
補償事務(TCC) 嚴選,阿里,螞蟻金服
TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作。
它分為三個階段:
-
Try 階段主要是對業務系統做檢測及資源預留。
-
Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,
默認 - - - Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
-
Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
舉個例子,假入 Bob 要向 Smith 轉賬,
思路大概是: 我們有一個本地方法,里面依次調用
1:首先在 Try 階段,要先調用遠程接口把 Smith 和 Bob 的錢給凍結起來。
2:在 Confirm 階段,執行遠程調用的轉賬的操作,轉賬成功進行解凍。
3:如果第2步執行成功,那么轉賬成功,如果第二步執行失敗,則調用遠程凍結接口對應的解凍方法 (Cancel)。
優點: 跟2PC比起來,實現以及流程相對簡單了一些,但數據的一致性比2PC也要差一些
缺點: 缺點還是比較明顯的,在2,3步中都有可能失敗。
TCC屬於應用層的一種補償方式,所以需要程序員在實現的時候多寫很多補償的代碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。
本地消息表(異步確保)比如:支付寶、微信支付主動查詢支付狀態,對賬單的形式
本地消息表與業務數據表處於同一個數據庫中,這樣就能利用本地事務來保證在對這兩個表的操作滿足事務特性,並且使用了消息隊列來保證最終一致性。
-
在分布式事務操作的一方完成寫業務數據的操作之后向本地消息表發送一個消息,本地事務能保證這個消息一定會被寫入本地消息表中。
-
之后將本地消息表中的消息轉發到 Kafka 等消息隊列中,如果轉發成功則將消息從本地消息表中刪除,否則繼續重新轉發。
-
在分布式事務操作的另一方從消息隊列中讀取一個消息,並執行消息中的操作。
優點: 一種非常經典的實現,避免了分布式事務,實現了最終一致性。
缺點: 消息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理。
MQ 事務消息 異步場景,通用性較強,拓展性較高
有一些第三方的MQ是支持事務消息的,比如RocketMQ,
他們支持事務消息的方式也是類似於采用的二階段提交,
但是市面上一些主流的MQ都是不支持事務消息的,比如 Kafka 不支持。
以阿里的 RabbitMQ 中間件為例,其思路大致為:
- 第一階段Prepared消息,會拿到消息的地址。
- 第二階段執行本地事務
- 第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。
也就是說在業務方法內要想消息隊列提交兩次請求,一次發送消息和一次確認消息。
如果確認消息發送失敗了RabbitMQ會定期掃描消息集群中的事務消息,
這時候發現了Prepared消息,它會向消息發送者確認,
所以生產方需要實現一個check接口,
RabbitMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。
這樣就保證了消息發送與本地事務同時成功或同時失敗。
優點: 實現了最終一致性,不需要依賴本地數據庫事務。
缺點: 實現難度大,主流MQ不支持,RocketMQ事務消息部分代碼也未開源。
通過本文我們總結並對比了幾種分布式分解方案的優缺點,分布式事務本身是一個技術難題,
是沒有一種完美的方案應對所有場景的,具體還是要根據業務場景去抉擇吧。
阿里RocketMQ去實現的分布式事務,現在也有除了很多分布式事務的協調器,比如LCN等,大家可以多去嘗試。
具體實現
分布式事務的完整架構圖

美團外賣的架構

系統與系統之間的分布式事務問題

系統間調用過程中事務回滾問題
import com.zwt.rabbitmq.dao.OrderDataBaseService;
import com.zwt.rabbitmq.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
@Service
public class OrderService {
@Autowired
private OrderDataBaseService orderDataBaseService;
// 創建訂單
@Transactional(rollbackFor = Exception.class) // 訂單創建整個方法添加事務
public void createOrder(Order orderInfo) throws Exception {
// 1: 訂單信息--插入丁訂單系統,訂單數據庫事務
orderDataBaseService.saveOrder(orderInfo);
// 2:通過Http接口發送訂單信息到運單系統
String result = dispatchHttpApi(orderInfo.getOrderId());
if(!"success".equals(result)) {
throw new Exception("訂單創建失敗,原因是運單接口調用失敗!");
}
}
/**
* 模擬http請求接口發送,運單系統,將訂單號傳過去 springcloud
* @return
*/
private String dispatchHttpApi(String orderId) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// 鏈接超時 > 3秒
factory.setConnectTimeout(3000);
// 處理超時 > 2秒
factory.setReadTimeout(2000);
// 發送http請求
String url = "http://localhost:9000/dispatch/order?orderId="+orderId;
RestTemplate restTemplate = new RestTemplate(factory);//異常
String result = restTemplate.getForObject(url, String.class);
return result;
}
}
基於MQ的分布式事務消息的可靠生產問題

如果這個時候MQ服務器出現了異常和故障,那么消息是無法獲取到回執信息。
如何是好?
基於MQ的分布式事務消息的可靠生產問題-定時重發
如果出現異常,咱們就重發消息。
基於MQ的分布式事務消息的可靠消費

基於MQ的分布式事務消息的消息重發
設置重試次數二定要進行控制或者 try/catch
基於MQ的分布式事務消息的死信隊列消息轉移 + 人工處理

如果死信隊列報錯就進行人工處理

總結
基於MQ的分布式事務解決方案優點:
1、通用性強
2、拓展方便
3、耦合度低,方案也比較成熟
基於MQ的分布式事務解決方案缺點:
1、基於消息中間件,只適合異步場景
2、消息會延遲處理,需要業務上能夠容忍
建議
1、盡量去避免分布式事務
2、盡量將非核心業務做成異步
Springboot整合rabbitmq集群配置詳解
1 引入starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2:詳細配置如下
rabbitmq:
addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client連接到的server的地址,多個以逗號分隔(優先取addresses,然后再取host)
# port:
##集群配置 addresses之間用逗號隔開
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / # 連接到rabbitMQ的vhost
requested-heartbeat: #指定心跳超時,單位秒,0為不指定;默認60s
publisher-confirms: #是否啟用 發布確認
publisher-reurns: # 是否啟用發布返回
connection-timeout: #連接超時,單位毫秒,0表示無窮大,不超時
cache:
channel.size: # 緩存中保持的channel數量
channel.checkout-timeout: # 當緩存數量被設置時,從緩存中獲取一個channel的超時時間,單位毫秒;如果為0,則總是創建一個新channel
connection.size: # 緩存的連接數,只有是CONNECTION模式時生效
connection.mode: # 連接工廠緩存模式:CHANNEL 和 CONNECTION
listener:
simple.auto-startup: # 是否啟動時自動啟動容器
simple.acknowledge-mode: # 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto
simple.concurrency: # 最小的消費者數量
simple.max-concurrency: # 最大的消費者數量
simple.prefetch: # 指定一個請求能處理多少個消息,如果有事務的話,必須大於等於transaction數量.
simple.transaction-size: # 指定一個事務處理的消息數量,最好是小於等於prefetch的數量.
simple.default-requeue-rejected: # 決定被拒絕的消息是否重新入隊;默認是true(與參數acknowledge-mode有關系)
simple.idle-event-interval: # 多少長時間發布空閑容器時間,單位毫秒
simple.retry.enabled: # 監聽重試是否可用
simple.retry.max-attempts: # 最大重試次數
simple.retry.initial-interval: # 第一次和第二次嘗試發布或傳遞消息之間的間隔
simple.retry.multiplier: # 應用於上一重試間隔的乘數
simple.retry.max-interval: # 最大重試時間間隔
simple.retry.stateless: # 重試是有狀態or無狀態
template:
mandatory: # 啟用強制信息;默認false
receive-timeout: # receive() 操作的超時時間
reply-timeout: # sendAndReceive() 操作的超時時間
retry.enabled: # 發送重試是否可用
retry.max-attempts: # 最大重試次數
retry.initial-interval: # 第一次和第二次嘗試發布或傳遞消息之間的間隔
retry.multiplier: # 應用於上一重試間隔的乘數
retry.max-interval: #最大重試時間間隔
對於發送方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin創建上一步的Exchange/Queue/Binding
4 配置RabbitTemplate用於發送消息,RabbitTemplate通過CachingConnectionFactory獲取到Connection,然后想指定Exchange發送
對於消費方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin創建上一步的Exchange/Queue/Binding
4 配置RabbitListenerContainerFactory
5 配置@RabbitListener/@RabbitHandler用於接收消息
3 Spring AMQP的主要對象
注:如果不了解AMQP請前往官網了解.

4 使用:
通過配置類加載的方式:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
public static final String RECEIVEDLXEXCHANGE="spring-ex";
public static final String RECEIVEDLXQUEUE="spring-qu1";
public static final String RECEIVEDLXROUTINGKEY="aa";
public static final String DIRECTEXCHANGE="spring-ex";
public static final String MDMQUEUE="mdmQueue";
public static final String TOPICEXCHANGE="spring-top";
@Value("${spring.rabbitmq.addresses}")
private String hosts;
@Value("${spring.rabbitmq.username}")
private String userName;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
/* @Value("${rabbit.channelCacheSize}")
private int channelCacheSize;*/
// @Value("${rabbit.port}")
// private int port;
/* @Autowired
private ConfirmCallBackListener confirmCallBackListener;
@Autowired
private ReturnCallBackListener returnCallBackListener;*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(hosts);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
// cachingConnectionFactory.setChannelCacheSize(channelCacheSize);
//cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setVirtualHost(virtualHost);
//設置連接工廠緩存模式:
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
//緩存連接數
cachingConnectionFactory.setConnectionCacheSize(3);
//設置連接限制
cachingConnectionFactory.setConnectionLimit(6);
logger.info("連接工廠設置完成,連接地址{}"+hosts);
logger.info("連接工廠設置完成,連接用戶{}"+userName);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
rabbitAdmin.declareBinding(bindingMdmQueue());
//聲明topic交換器
rabbitAdmin.declareExchange(directExchange());
logger.info("管理員設置完成");
return rabbitAdmin;
}
@Bean
public RabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//最小消費者數量
factory.setConcurrentConsumers(10);
//最大消費者數量
factory.setMaxConcurrentConsumers(10);
//一個請求最大處理的消息數量
factory.setPrefetchCount(10);
//
factory.setChannelTransacted(true);
//默認不排隊
factory.setDefaultRequeueRejected(true);
//手動確認接收到了消息
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
logger.info("監聽者設置完成");
return factory;
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECTEXCHANGE,true,false);
}
@Bean
public Queue mdmQueue(){
Map arguments = new HashMap<>();
// 綁定該隊列到私信交換機
arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE);
arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY);
logger.info("隊列交換機綁定完成");
return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments);
}
@Bean
Binding bindingMdmQueue() {
return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
//發布確認
// rabbitTemplate.setConfirmCallback(confirmCallBackListener);
// 啟用發布返回
// rabbitTemplate.setReturnCallback(returnCallBackListener);
logger.info("連接模板設置完成");
return rabbitTemplate;
}
/* @Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPICEXCHANGE,true,false);
}*/
/*
*//**
* @return DirectExchange
*//*
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(RECEIVEDLXEXCHANGE,true,false);
}
*//*
*
* @return Queue
*//*
@Bean
public Queue dlxQueue() {
return new Queue(RECEIVEDLXQUEUE,true);
}
*//*
* @return Binding
*//*
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY);
}*/
}
通過兩種方式加載
1 通過配置文件
2 通過配置類
說明:上面是通過配置文件與配置類的方式去加載,常用的配置如上所示。
實際使用中要生產方與消費方要分開配置,相關配置也會有小變動,大體配置不變。更多信息可查看官網配置。
集群監控
在廣大的互聯網行業中RabbitMQ幾乎都會有集群,那么對於集群的監控就成了企業生態中必不可少的一環。
接下來我們來將講解主要的4種監控。
管理界面監控
管理界面監控需要我們開啟對應的插件(rabbitmq-plugins enable rabbitmq_management)
然后訪問http://ip:15672
tracing日志監控
以下是trace的相關命令和使用(要使用需要先rabbitmq啟用插件,再打開開關才能使用):
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 查看插件列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq啟用trace插件 |
rabbitmqctl trace_on | 打開trace的開關 |
rabbitmqctl trace_on -p itcast | 打開trace的開關(itcast為需要日志追蹤的vhost) |
rabbitmqctl trace_off | 關閉trace的開關 |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq關閉Trace插件 |
rabbitmqctl set_user_tags heima administrator | 只有administrator的角色才能查看日志界面 |
安裝插件並開啟 trace_on 之后,會發現多個 exchange:amq.rabbitmq.trace ,類型為:topic。
日志追蹤
rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--01。");

定制自己的監控系統
RabbitMQ提供了很豐富的restful風格的api接口,
我們可以通過這些接口得到對應的集群數據,此時我們就可以定制我們的監控系統。
更多API的相關信息和描述可以訪問http://ip:15672/api/
接下來我們使用RabbitMQ Http API接口來獲取集群監控數據
HttpClient以及Jackson的相關Jar
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
創建MonitorRabbitMQ類實現具體的代碼
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* RabbitMQ的監控
*/
public class MonitorRabbitMQ {
//RabbitMQ的HTTP API——獲取集群各個實例的狀態信息,ip替換為自己部署相應實例的
private static String RABBIT_NODES_STATUS_REST_URL = "http://192.168.13.111:15672/api/nodes";
//RabbitMQ的HTTP API——獲取集群用戶信息,ip替換為自己部署相應實例的
private static String RABBIT_USERS_REST_URL = "http://192.168.13.111:15672/api/users";
//rabbitmq的用戶名
private static String RABBIT_USER_NAME = "guest";
//rabbitmq的密碼
private static String RABBIT_USER_PWD = "guest";
public static void main(String[] args) {
try {
//step1.獲取rabbitmq集群各個節點實例的狀態信息
Map<String, ClusterStatus> clusterMap =
fetchRabbtMQClusterStatus(RABBIT_NODES_STATUS_REST_URL, RABBIT_USER_NAME, RABBIT_USER_PWD);
//step2.打印輸出各個節點實例的狀態信息
for (Map.Entry entry : clusterMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
//step3.獲取rabbitmq集群用戶信息
Map<String, User> userMap =
fetchRabbtMQUsers(RABBIT_USERS_REST_URL, RABBIT_USER_NAME, RABBIT_USER_PWD);
//step4.打印輸出rabbitmq集群用戶信息
for (Map.Entry entry : userMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static Map<String, ClusterStatus> fetchRabbtMQClusterStatus(String url, String username, String password) throws IOException {
Map<String, ClusterStatus> clusterStatusMap = new HashMap<String, ClusterStatus>();
String nodeData = getData(url, username, password);
JsonNode jsonNode = null;
try {
jsonNode = JsonUtil.toJsonNode(nodeData);
} catch (IOException e) {
e.printStackTrace();
}
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
JsonNode next = iterator.next();
ClusterStatus status = new ClusterStatus();
status.setDiskFree(next.get("disk_free").asLong());
status.setFdUsed(next.get("fd_used").asLong());
status.setMemoryUsed(next.get("mem_used").asLong());
status.setProcUsed(next.get("proc_used").asLong());
status.setSocketUsed(next.get("sockets_used").asLong());
clusterStatusMap.put(next.get("name").asText(), status);
}
return clusterStatusMap;
}
public static Map<String, User> fetchRabbtMQUsers(String url, String username, String password) throws IOException {
Map<String, User> userMap = new HashMap<String, User>();
String nodeData = getData(url, username, password);
JsonNode jsonNode = null;
try {
jsonNode = JsonUtil.toJsonNode(nodeData);
} catch (IOException e) {
e.printStackTrace();
}
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
JsonNode next = iterator.next();
User user = new User();
user.setName(next.get("name").asText());
user.setTags(next.get("tags").asText());
userMap.put(next.get("name").asText(), user);
}
return userMap;
}
public static String getData(String url, String username, String password) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(username, password);
HttpGet httpGet = new HttpGet(url);
httpGet.addHeader(BasicScheme.authenticate(creds, "UTF-8", false));
httpGet.setHeader("Content-Type", "application/json");
CloseableHttpResponse response = httpClient.execute(httpGet);
try {
if (response.getStatusLine().getStatusCode() != 200) {
System.out.println("call http api to get rabbitmq data return code: " + response.getStatusLine().getStatusCode() + ", url: " + url);
}
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
} finally {
response.close();
}
return null;
}
public static class JsonUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
//objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
}
public static JsonNode toJsonNode(String jsonString) throws IOException {
return objectMapper.readTree(jsonString);
}
}
public static class User {
private String name;
private String tags;
@Override
public String toString() {
return "User{" +
"name=" + name +
", tags=" + tags +
'}';
}
//GET/SET方法省略
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
}
public static class ClusterStatus {
private long diskFree;
private long diskLimit;
private long fdUsed;
private long fdTotal;
private long socketUsed;
private long socketTotal;
private long memoryUsed;
private long memoryLimit;
private long procUsed;
private long procTotal;
// 此處省略了Getter和Setter方法
public long getDiskFree() {
return diskFree;
}
public void setDiskFree(long diskFree) {
this.diskFree = diskFree;
}
public long getDiskLimit() {
return diskLimit;
}
public void setDiskLimit(long diskLimit) {
this.diskLimit = diskLimit;
}
public long getFdUsed() {
return fdUsed;
}
public void setFdUsed(long fdUsed) {
this.fdUsed = fdUsed;
}
public long getFdTotal() {
return fdTotal;
}
public void setFdTotal(long fdTotal) {
this.fdTotal = fdTotal;
}
public long getSocketUsed() {
return socketUsed;
}
public void setSocketUsed(long socketUsed) {
this.socketUsed = socketUsed;
}
public long getSocketTotal() {
return socketTotal;
}
public void setSocketTotal(long socketTotal) {
this.socketTotal = socketTotal;
}
public long getMemoryUsed() {
return memoryUsed;
}
public void setMemoryUsed(long memoryUsed) {
this.memoryUsed = memoryUsed;
}
public long getMemoryLimit() {
return memoryLimit;
}
public void setMemoryLimit(long memoryLimit) {
this.memoryLimit = memoryLimit;
}
public long getProcUsed() {
return procUsed;
}
public void setProcUsed(long procUsed) {
this.procUsed = procUsed;
}
public long getProcTotal() {
return procTotal;
}
public void setProcTotal(long procTotal) {
this.procTotal = procTotal;
}
@Override
public String toString() {
return "ClusterStatus{" +
"diskFree=" + diskFree +
", diskLimit=" + diskLimit +
", fdUsed=" + fdUsed +
", fdTotal=" + fdTotal +
", socketUsed=" + socketUsed +
", socketTotal=" + socketTotal +
", memoryUsed=" + memoryUsed +
", memoryLimit=" + memoryLimit +
", procUsed=" + procUsed +
", procTotal=" + procTotal +
'}';
}
}
}
Zabbix 監控RabbitMQ
Zabbix是一個基於WEB界面提供分布式系統監視以及網絡監視功能的企業級開源解決方案,
他也可以幫助我們搭建一個MQ集群的監控系統,同時提供預警等功能,
但是由於其搭建配置要求比較高一般都是由運維人員負責搭建,
感興趣的可以訪問https://www.zabbix.com/ 官網進行了解學習。
面試題分析
1、Rabbitmq 為什么需要信道,為什么不是TCP直接通信
1、TCP的創建和銷毀,開銷大,創建要三次握手,銷毀要4次分手。
2、如果不用信道,那應用程序就會TCP連接到Rabbit服務器,高峰時每秒成千上萬連接就會造成資源的巨大浪費,而且底層操作系統每秒處理tcp連接數也是有限制的,必定造成性能瓶頸。
3、信道的原理是一條線程一條信道,多條線程多條信道同用一條TCP連接,一條TCP連接可以容納無限的信道,即使每秒成千上萬的請求也不會成為性能瓶頸。
2:queue到底在消費者創建還是生產者創建?
1: 一般建議是在rabbitmq操作面板創建。這是一種穩妥的做法。
2:按照常理來說,確實應該消費者這邊創建是最好,消息的消費是在這邊。
這樣你承受一個后果,可能我生產在生產消息可能會丟失消息。
3:在生產者創建隊列也是可以,這樣穩妥的方法,消息是不會出現丟失。
4:如果你生產者和消費都創建的隊列,誰先啟動誰先創建,后面啟動就覆蓋前面的
