可靠性投遞:
首先需要明確,效率與可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對消息的收發效率造成影響。如果是一些業務實時一致性要求不是特別高的場合,可以犧牲一些可靠性來換取效率。
要保證消息的可靠性投遞,首先需要從以下幾方面來確保,其次考慮其他的原因:
1、確保消息發送到RabbitMQ服務器(發送)。
可能因為網絡或者Broker的問題導致①失敗,而生產者是無法知道消息是否正確發送到Broker的。有兩種解決方案,第一種是Transaction(事務)模式,第二種Confirm(確認)模式。
在通過channel.txSelect方法開啟事務之后,我們便可以發布消息給RabbitMQ了,如果事務提交成功,則消息一定到達了RabbitMQ中,如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。使用事務機制的話會“吸干”RabbitMQ的性能,一般不建議使用。
try { channel.txSelect(); // 發送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // int i =1/0; channel.txCommit(); System.out.println("消息發送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已經回滾"); }
生產者通過調用channel.confirmSelect方法(即Confirm.Select命令)將信道設置為confirm模式。一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經正確到達了目的地了。Confirm.Select命令 下分為三種模式分別如下:
普通確認模式:
// 開啟發送方確認模式 channel.confirmSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); // 普通Confirm,發送一條,確認一條 if (channel.waitForConfirms()) { System.out.println("消息發送成功" ); }
批量確認模式:
try { channel.confirmSelect(); for (int i = 0; i < 5; i++) { // 發送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes()); } // 批量確認結果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被確認了 // 比如5條消息可能只收到1個ACK,也可能收到2個(抓包才看得到) // 直到所有信息都發布,只要有一個未被Broker確認就會IOException channel.waitForConfirmsOrDie(); System.out.println("消息發送完畢,批量確認成功"); } catch (Exception e) { // 發生異常,可能需要對所有消息進行重發 e.printStackTrace(); }
異步監聽確認模式:
// 用來維護未確認消息的deliveryTag final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); // 這里不會打印所有響應的ACK;ACK可能有多個,有可能一次確認多條,也有可能一次確認一條 // 異步監聽確認和未確認的消息 // 如果要重復運行,先停掉之前的生產者,清空隊列 channel.addConfirmListener(new ConfirmListener() { public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Broker未確認消息,標識:" + deliveryTag); if (multiple) { // headSet表示后面參數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { confirmSet.remove(deliveryTag); } // 這里添加重發的方法 } public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果true表示批量執行了deliveryTag這個值以前(小於deliveryTag的)的所有消息,如果為false的話表示單條確認 System.out.println(String.format("Broker已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); System.out.println("multiple:"+multiple); if (multiple) { System.out.println("deliveryTag:"+deliveryTag); // headSet表示后面參數之前的所有元素,全部刪除 confirmSet.headSet(deliveryTag + 1L).clear(); } else { // 只移除一個元素 confirmSet.remove(deliveryTag); } System.out.println("未確認的消息:"+confirmSet); } }); // 開啟發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { long nextSeqNo = channel.getNextPublishSeqNo(); // 發送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes()); confirmSet.add(nextSeqNo); } System.out.println("所有消息:"+confirmSet); // 這里注釋掉的原因是如果先關閉了,可能收不到后面的ACK //channel.close(); //conn.close();
2、確保消息路由到正確的隊列(交換機路由)。
可能因為路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤導致②失敗。可以使用 mandatory=true 配合ReturnListener,可以實現消息無法路由的時候返回給生產者。另一種方式就是使用備份交換機(alternate-exchange),無法路由的消息會發送到這個交換機上。
//1.采用ReturnListener channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("=========監聽器收到了無法路由,被返回的消息============"); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("message:"+new String(body)); } }); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2). contentEncoding("UTF-8").build(); // 2.在聲明交換機的時候指定備份交換機 //Map<String,Object> arguments = new HashMap<String,Object>(); //arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); //channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments); // 發送到了默認的交換機上,由於沒有任何隊列使用這個關鍵字跟交換機綁定,所以會被退回 // 第三個參數是設置的mandatory,如果mandatory是false,消息也會被直接丟棄 channel.basicPublish("","wuzztest",true, properties,"只為更好的你".getBytes());
運行上訴例子,由於設置了ReturnListener,可以看到Broker通知了我們消息路由失敗。
3、確保消息在隊列正確地存儲(存儲)。
可能因為系統宕機、重啟、關閉等等情況導致存儲在隊列的消息丟失,即③出現問題。
1、隊列持久化,聲明隊列的時候第二個參數指定為true。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
2、交換機持久化,聲明交換機的時候第三個參數設置為true。
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true, false, null);
3、消息持久化,發送消息的時候設置deliveryMode(2)。
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 2代表持久化,其他代表瞬態 .build(); channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
4、集群,鏡像隊列。
4、確保消息從隊列正確地投遞到消費者(消費)。
如果消費者收到消息后未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(messageacknowledgement)。消費者在訂閱隊列時,可以指定autoAck參數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回復確認信號后才從隊列中移去消息。
如果消息消費失敗,也可以調用Basic.Reject或者Basic.Nack來拒絕當前消息而不是確認。如果requeue參數設置為true,可以把這條消息重新存入隊列,以便發給下一個消費者(當然,只有一個消費者的時候,這種方式可能會出現無限循環重復消費的情況,可以投遞到新的隊列中,或者只打印異常日志)。
// 創建消費者,並接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); if (msg.contains("拒收")){ // 拒絕消息 // requeue:是否重新入隊列,true:是;false:直接丟棄,相當於告訴隊列可以直接刪除掉 // TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費 channel.basicReject(envelope.getDeliveryTag(), false); } else if (msg.contains("異常")){ // 批量拒絕 // requeue:是否重新入隊列 // TODO 如果只有這一個消費者,requeue 為true 的時候會造成消息重復消費 channel.basicNack(envelope.getDeliveryTag(), true, false); } else { // 手工應答 // 如果不應答,隊列中的消息會一直存在,重新連接的時候會重復消費 channel.basicAck(envelope.getDeliveryTag(), true); } } }; // 開始獲取消息,注意這里開啟了手工應答 // String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, false, consumer);
在自動確認的情況下,消息是在Broker發送給消費者之后就從隊列刪除。否則在調用basicACK方法的時候刪除。
5、其他
消費者回調:
消費者處理消息以后,可以再發送一條消息給生產者,或者調用生產者的API,告知消息處理完畢。參考:二代支付中異步通信的回執,多次交互。某提單APP,發送碎屏保消息后,消費者必須回調API。
補償機制:
對於一定時間沒有得到響應的消息,可以設置一個定時重發的機制,但要控制次數,比如最多重發3次,否則會造成消息堆積。
參考:ATM存款未得到應答時發送5次確認;ATM取款未得到應答時,發送5次沖正。根據業務表狀態做一個重發。
消息冪等性:
服務端是沒有這種控制的,只能在消費端控制。消息重復可能會有兩個原因:
1、生產者的問題,環節①重復發送消息,比如在開啟了Confirm模式但未收到確認。
2、環節④出了問題,由於消費者未發送ACK或者其他原因,消息重復投遞。
對於重復發送的消息,可以對每一條消息生成一個唯一的業務ID,通過日志或者建表來做重復控制。
消息的順序性:
消息的順序性指的是消費者消費的順序跟生產者產生消息的順序是一致的。在RabbitMQ中,一個隊列有多個消費者時,由於不同的消費者消費消息的速度是不一樣的,順序無法保證。
高可用集群方案:
rabbitmq集群是通過erlang的分布式特性進行rabbitmq集群,各個rabbitmq的服務為相應的節點,每個節點都提供給客戶端連接,進行消息的發送與接收。rabbitmq各節點之間通信使用域名,所以集群成員中所有主機名都要可解析。
1.修改 hosts文件:vim /etc/hosts , 三台都這樣子配置
192.168.254.137 rabbit1 192.168.254.138 rabbit2 192.168.254.139 rabbit3
2.修改 rabbitMQ的HOME配置,vim rabbitmq-env.conf ,138.139兩台分別為rabbit@rabbit2,rabbit@rabbit3:
#192.168.254.137 NODENAME=rabbit@rabbit1 HOME=/mysoft/rabbitmq_server-3.6.12/var/lib/rabbitmq/
3.rabbitmq集群的節點間是使用cookie來確認通信的,所以集群中的每個節點都必須有相同的erlang.cookie每個rabbitmq啟動時,erlang會自動創建一個cookie文件,為了使每個節點的cookie保持一致,可以先讓其中一個節點來創建,然后將這個文件拷貝到其他節點的相應位置。我這邊讓137節點去生成,通過以下命令拷貝到其他兩個節點:
scp .erlang.cookie root@192.168.254.138:/mysoft/rabbitmq_server-3.6.12/var/lib/rabbitmq/
4.分別啟動3台節點服務。然后將 rabbit@rabbit2,rabbit@rabbit3 兩台節點加入rabbit@rabbit1:
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbit1 rabbitmqctl start_app
如果出現以下信息,把各個節點對應的數據目錄直接干掉重啟,當然生產上不會這么干:
Error: {inconsistent_cluster,"Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees"}
5.成功后會看到如下狀態:
節點狀態分為 磁盤節點跟內存節點,改變節點狀態:
rabbitmqctl stop_app rabbitmqctl reset // 會移除節點 rabbitmqctl join_cluster rabbit@rabbit1 rabbitmqctl change_cluster_node_type ram rabbitmqctl start_app
成功如下:
1)每個節點會保存交換器、隊列、綁定等數據
2)消費者可以通過連接任何節點來定位到需要的隊列獲取數據
3)默認情況下,隊列的完整信息(含內容)不會在集群下所有的節點中保存,而是只存在於一個節點中
4)當保存隊列完整信息的節點崩潰,消費者不能從集群中獲得這個隊列的消息,生產者不能推送消息到隊列中
5)為了保證集群的高可用,可以在創建隊列時聲明為鏡像隊列,即所有節點都保存隊列的完整信息,其中有一個主隊列,其他都是從隊列,當主隊列所在的節點崩潰,集群在從隊列中選舉出一個隊列成為主隊列
查看集群狀態:rabbitmqctl cluster_status。
普通集群是無法實現隊列及隊列里面數據的復制,所以引入了鏡像隊列。
鏡像隊列:
鏡像隊列的配置可以通過管理端頁面,也可以使用命令,命令如下:
rabbitmqctl set_policy [-p vhostpath] {name} {pattern} {definition} [priority] #ha-mode:all、exactly、nodes
比如 rabbitmqctl set_policy -p hostname test "^" '{"ha-mode":"all"}',這行命令在vhost名稱為hostname創建了一個策略,策略名稱為test,策略模式為 all 即復制到所有節點,包含新增節點,策略正則表達式為 “^” 表示所有匹配所有隊列名稱。通過控制台如下圖:
匹配成功后查看隊列會出現以下情況,會發現Node列會出現 +2,說明集群還有兩個節點和本節點是鏡像同步模式:
可以通過配置鏡像隊列的方式去實現數據的同步,然后可以通過HAproxy負載+Keepalived實現集群的高可用。具體配置可以參考 https://www.cnblogs.com/wuzhenzhao/p/10195423.html 。架構如下:
網絡分區:
為什么會出現分區?因為RabbitMQ對網絡延遲非常敏感,為了保證數據一致性和性能,在出現網絡故障時,集群節點會出現分區。
當一個RabbitMQ集群發生網絡分區時,這個集群會分成兩個或者多個分區,它們各自為政,互相都認為對方分區的節點已經down,包括queues,bindings,exchanges這些信息的創建和銷毀都處於
自身分區內,與其它分區無關。如果原集群中配置了鏡像隊列,而這個鏡像隊列又牽涉到兩個或者多個網絡分區中的節點時,每一個網絡分區中都會出現一個master節點,如果分區節點個數充足,也
會出現新的slave節點,對於各個網絡分區,彼此的隊列都是相互獨立的,當然也會有一些其他未知的、怪異的事情發生。當網絡恢復時,網絡分區的狀態還是會保持,除非采取一些措施去解決他。
為了從網絡分區中恢復,首先需要挑選一個信任的分區,這個分區才有決定Mnesia內容的權限,發生在其他分區的改變將不被記錄到Mnesia中而直接丟棄。手動恢復網絡分區有兩種思路:
1. 停止其他分區中的節點,然后重新啟動這些節點。最后重啟信任分區中的節點,以去除告警。
2. 關閉整個集群的節點,然后再啟動每一個節點,這里需確保你啟動的第一個節點在你所信任的分區之中。
停止/啟動節點有兩種操作方式:
1. rabbimqctl stop/ rabbitmq-server -detached
2. rabbitmqctl stop_app/ rabbitmqctl start_app
自動處理網絡分區:
RabbitMQ提供了4種處理網絡分區的方式,在rabbitmq.config中配置cluster_partition_handling參數即可,分別為:
1. ignore (默認是ignore)
2. pause_minority (少數派中的節點在分區發生時會自動關閉,當分區結束時又會啟動)
3. pause_if_all_down(RabbitMQ會自動關閉不能和list中節點通信的節點)
4. autoheal(當認為發生網絡分區時,RabbitMQ會自動決定一個獲勝的(winning)分區,然后重啟不在這個分區中的節點以恢復網絡分區。)
廣域網的同步方案:federation插件 ,shovel插件。
常見問題:
1、消息隊列的作用與使用場景?
異步:批量數據異步處理。削峰:高負載任務負載均衡(秒殺)。解耦:串行任務並行化。廣播:基於發布訂閱實現一對多通信。
2、多個消費者監聽一個生產者時,消息如何分發?
采用Round-Robin(輪詢,可以通過設置channel.basicQos(2) 來設置 PrefetchSize的值實現Fair dispatch(公平分發),。當超過2條消息沒提交,隊列則不會給該消費者發送消息。
3、無法被路由的消息,去了哪里?
默認情況下這種消息會被丟棄,當然我們可以通過設置來實現消息重發,或者指定備用交換機來轉發死信隊列。上文中確保路由到制定隊列中已寫明。
4、消息在什么時候會變成Dead Letter(死信)?
1.消息被消費者拒絕,並且requeue,重新入隊被設置成false:channel.basicReject(envelope.getDeliveryTag(), false
);
2.消息過期。隊列設置了過期時間或者指定消息設置了過期時間,可以在定義的時候設置死信交換機,以做后期處理
3.隊列達到最大長度(先入隊的消息會被發送到DLX)
5、RabbitMQ如何實現延遲隊列?
使用rabbitmq-delayed-message-exchange插件,或者通過死信隊列的機制。使用TTL(過期時間)結合DLX(死信)的方式來實現消息的延遲投遞。
6、如何保證消息的可靠性投遞?
見上文。一共分為4步,保證消息發送(Prodecer),消息轉發(Exchange),消息存儲(Queue),消息消費(Consumer)等方面去保證。
7、如何在服務端和消費端做限流?
服務端限流通過磁盤空間與內存空間去控制,默認是1G跟40%的時候,這兩個閾值都可以通過rabbit.config去配置
消費端的限流可以通過來設置 PrefetchSize的值實現,調用channel.basicQos(2)。
8、如何保證消息的順序性?
正常情況下有多個生產者生產消息與多個消費者同時消費消息,消息的順序性是無法保證的,在一個隊列只有一個消費者的情況,才能實現順序性。或者使用全局ID去處理保證順序性。
9、RabbitMQ的集群模式及節點類型?
普通模式,鏡像模式。節點類型分為磁盤節點(DISC)與內存節點(RAM)。