RabbitMQ 從入門到精通(二)


消息如何保障百分之百的投遞成功?

什么是生產端的可靠性投遞?

  • 保障消息的成功發出
  • 保障MQ節點的成功接收
  • 發送端收到MQ節點(Broker)確認應答
  • 完善的進行消息補償機制

如果想保障消息百分百投遞成功,只做到前三步不一定能夠保障。有些時候或者說有些極端情況,比如生產端在投遞消息時可能就失敗了,或者說生產端投遞了消息,MQ也收到了,MQ在返回確認應答時,由於網絡閃斷導致生產端沒有收到應答,此時這條消息就不知道投遞成功了還是失敗了,所以針對這些情況我們需要做一些補償機制。

 

方案一:消息落庫,對消息狀態進行打標

 

  1. 進行數據的入庫,比如我們要發送一條訂單消息,首先得把業務數據也就是訂單信息存庫,然后生成一條消息,把消息也進行入庫,這條消息應該包含消息狀態屬性 Create_Date(創建時間),並設置初始標志 比如0,表示消息創建成功,正在發送中

  2. 首先要保證第一步消息都存儲成功了,沒有出現任何異常情況,然后生產端再進行消息發送。如果失敗了就進行快速失敗機制

  3. MQ把消息收到的結果應答(confirm)給生產端

  4. 生產端有一個Confirm Listener,去異步的監聽Broker回送的響應,從而判斷消息是否投遞成功,如果成功,去數據庫查詢該消息,並將消息狀態更新為1,表示消息投遞成功
     
    假設第二步OK了,在第三步回送響應時,網絡突然出現了閃斷,導致生產端的Listener就永遠收不到這條消息的confirm應答了,也就是說這條消息的狀態就一直為0了

  5. 此時我們需要設置一個規則,比如說消息在入庫時候設置一個臨界值timeout,5分鍾之后如果還是0的狀態那就需要把消息抽取出來。這里我們使用的是分布式定時任務,去定時抓取DB中距離消息創建時間超過5分鍾的且狀態為0的消息。

  6. 把抓取出來的消息進行重新投遞(Retry Send),也就是從第二步開始繼續往下走

  7. 當然有些消息可能就是由於一些實際的問題無法路由到Broker,比如routingKey設置不對,對應的隊列被誤刪除了,那么這種消息即使重試多次也仍然無法投遞成功,所以需要對重試次數做限制,比如限制3次,如果投遞次數大於三次,那么就將消息狀態更新為2,表示這個消息最終投遞失敗。

 

針對這種情況如何去做補償呢,可以有一個補償系統去查詢這些最終失敗的消息,然后給出失敗的原因,當然這些可能都需要人工去操作。

第一種可靠性投遞,在高並發的場景下是否適合?

對於第一種方案,我們需要做兩次數據庫的持久化操作,在高並發場景下顯然數據庫存在着性能瓶頸。其實在我們的核心鏈路中只需要對業務進行入庫就可以了,消息就沒必要先入庫了,我們可以做消息的延遲投遞,做二次確認,回調檢查。

當然這種方案不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的消息是OK的,有些特別極端的情況只能是人工去做補償了,或者使用定時任務去做都可以。

 

方案二:消息的延遲投遞,做二次確認,回調檢查

 

Upstream Service上游服務也就是生產端,Downstream service下游服務也就是消費端,Callback service就是回調服務。

 

  1. 先將業務消息進行入庫,然后生產端將消息發送出去
  2. 在發送消息之后,緊接着生產端再次發送一條消息(Second Send Delay Check),即延遲消息投遞檢查,這里需要設置一個延遲時間,比如5分鍾之后進行投遞。
  3. 消費端去監聽指定隊列,將收到的消息進行處理。
  4. 處理完成之后,發送一個confirm消息,也就是回送響應,但是這里響應不是正常的ACK,而是重新生成一條消息,投遞到MQ中。
  5. 上面的Callback service是一個單獨的服務,其實它扮演了第一種方案的存儲消息的DB角色,它通過MQ去監聽下游服務發送的confirm消息,如果Callback service收到confirm消息,那么就對消息做持久化存儲,即將消息持久化到DB中。
  6. 5分鍾之后延遲消息發送到MQ了,然后Callback service還是去監聽延遲消息所對應的隊列,收到Check消息后去檢查DB中是否存在消息,如果存在,則不需要做任何處理,如果不存在或者消費失敗了,那么Callback service就需要主動發起RPC通信給上游服務,告訴它延遲檢查的這條消息我沒有找到,你需要重新發送,生產端收到信息后就會重新查詢業務消息然后將消息發送出去。

這么做的目的是少做了一次DB的存儲,在高並發場景下,最關心的不是消息100%投遞成功,而是一定要保證性能,保證能抗得住這么大的並發量。所以能節省數據庫的操作就盡量節省,可以異步的進行補償。

 

其實在主流程里面是沒有這個Callback service的,它屬於一個補償的服務,整個核心鏈路就是生產端入庫業務消息,發送消息到MQ,消費端監聽隊列,消費消息。其他的步驟都是一個補償機制。

第二種方案也是互聯網大廠更為經典和主流的解決方案。但是若對性能要求不是那么高,第一種方案要更簡單

 

冪等性

冪等性是什么?

簡單來說就是用戶對於同一操作發起的一次請求或者多次請求的結果是一致的。

我們可以借鑒數據庫的樂觀鎖機制來舉個例子:

  • 首先為表添加一個版本字段version

  • 在執行更新操作前呢,會先去數據庫查詢這個version

  • 然后執行更新語句,以version作為條件,例如:

    UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1

  • 如果執行更新時有其他人先更新了這張表的數據,那么這個條件就不生效了,也就不會執行操作了,通過這種樂觀鎖的機制來保障冪等性。

 

消息端冪等性保障

重復消費問題:

當消費者消費完消息時,在給生產端返回ack時由於網絡中斷,導致生產端未收到確認信息,該條消息會重新發送並被消費者消費,但實際上該消費者已成功消費了該條消息,這就是重復消費問題。

 

唯一ID+指紋碼機制

唯一ID:業務表唯一的主鍵,如商品ID

指紋碼:為了區別每次正常操作的碼,每次操作時生成指紋碼;可以用時間戳+業務編號或者標志位(具體視業務場景而定)

 

  • 唯一ID+指紋碼機制,利用數據庫主鍵去重
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID and IS_CONSUM= 指紋碼
  • 好處:實現簡單
  • 壞處:高並發下有數據庫寫入的性能瓶頸
  • 解決方案:根據ID進行分庫分表算法路由

整個思路就是首先我們需要根據消息生成一個全局唯一的ID,然后還需要加上一個指紋碼。這個指紋碼它並不一定是系統去生成的,而是一些外部的規則或者內部的業務規則去拼接,它的目的就是為了保障這次操作是絕對唯一的。

將ID + 指紋碼拼接好的值作為數據庫主鍵,就可以進行去重了。即在消費消息前呢,先去數據庫查詢這條消息的指紋碼標識是否存在,沒有就執行insert操作,如果有就代表已經被消費了,就不需要管了。

 

利用Redis的原子性去實現

這里只提用Redis的原子性去解決MQ冪等性重復消費的問題

注意:MQ的冪等性問題 根本在於的是生產端未正常接收ACK,可能是網絡抖動、網絡中斷導致

 

我的方案:

MQ消費端在消費開始時 將 ID放入到Redis的BitMap中,MQ生產端每次生產數據時,從Redis的BitMap對應位置若不能取出ID,則生產消息發送,否則不進行消息發送。

但是有人可能會說,萬一消費端,生產端Redis命令執行失敗了怎么辦,雖然又出現重復消費又出現Redis非正常執行命令的可能性極低,但是萬一呢?

OK,我們可以在Redis命令執行失敗時,將消息落庫,每日用定時器,對這種極特殊的消息進行處理。

 

Confirm機制

如何理解?

  • 消息的確認,是指生產者投遞消息后,如果Broker收到消息,則會給我們生產者一個應答
  • 生產者進行接收應答,用來確定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞

的核心保障

 

確認機制流程圖

生產端發送消息到Broker,然后Broker接收到了消息后,進行回送響應,生產端有一個Confirm Listener,去監聽應答,當然這個操作是異步進行的,生產端將消息發送出去就可以不用管了,讓內部監聽器去監聽Broker給我們的響應。

 

怎么實現?

  • 第一步,在channel上開啟確認模式:channel.confirmSelect()
  • 第二步,在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行重新發送、或記錄日志等后續處理!
public class Producer {
	public static void main(String[] args) throws Exception {
		
		//創建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);

		
		//獲取Connection
		Connection connection = connectionFactory.newConnection();
		
		//通過connection創建一個新的Channel
		Channel channel = connection.createChannel();
		
		//指定我們的消息投遞模式
		channel.confirmSelect();
		
		String exchangeName = "test_confirm_exchange";
		String routingkey = "confirm.save";
		
		//發送一條信息
		String msg = "Hello RabbitMQ Send confirm message!";
		channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
		
		//添加一個確認監聽
		channel.addConfirmListener(new ConfirmListener() {
			
			@Override
			public void handleNack(long deliveryTag, boolean multiple)
					throws IOException {
				System.out.println("-------no ack!---------");
			}
			
			@Override
			public void handleAck(long deliveryTag, boolean multiple)
					throws IOException {
				System.out.println("--------ack!----------");
			}
		});
	}
}

 

public class Consumer {
	public static void main(String[] args) throws Exception{
		//創建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);

		
		//獲取Connection
		Connection connection = connectionFactory.newConnection();
		
		//通過connection創建一個新的Channel
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_confirm_exchange";
		String routingkey = "confirm.#";
		String queueName = "test_confirm_queue"; 
		
		//聲明交換機和隊列 然后進行綁定和 設置 最后制定路由key
		channel.exchangeDeclare(exchangeName, "topic",true);
		channel.queueDeclare(queueName, true, false, false, null);
		
		channel.queueBind(queueName, exchangeName, routingkey);
		
		//創建消費者
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true,queueingConsumer); 
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.out.println("消費端:" + msg);
		}
	}
}

 

運行說明

先啟動消費端,訪問管控台:http://ip:15672,檢查Exchange和Queue是否設置OK,然后啟動生產端,消息被消費端消費,生產端也成功監聽到了ACK響應。

 

Return機制

如何理解?

  • Return Listener 用於處理一些不可路由的消息!
  • 我們的消息生產者,通過指定一個Exchange 和Routingkey,把消息送達到某一個隊列中去, 然后我們的消費者監聽隊列,進行消費處理操作!
  • 但是在某些情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的消息,就要使用Return Listener!

 

如何實現?

  1. 添加return監聽:addReturnListener,生產端去監聽這些不可達的消息,做一些后續處理,比如說,記錄下消息日志,或者及時去跟蹤記錄,有可能重新設置一下就好了
  2. 發送消息時,設置Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然后進行后續處理,如果為false,那么broker端自動刪除該消息!

 

public class ReturnProducer {
	 public static void main(String[] args) throws Exception {
	        //1 創建ConnectionFactory
	        ConnectionFactory connectionFactory = new ConnectionFactory();
	        connectionFactory.setHost("192.168.244.11");
			connectionFactory.setPort(5672);
			connectionFactory.setVirtualHost("/");
			connectionFactory.setHandshakeTimeout(20000);
	        //2 獲取Connection
	        Connection connection = connectionFactory.newConnection();
	        //3 通過Connection創建一個新的Channel
	        Channel channel = connection.createChannel();
	        
	        String exchange = "test_return_exchange";
	        //String routingKey = "return.save";
	        String routingKeyError = "abc.save";
	        
	        String msg = "Hello RabbitMQ Return Message";
	        //添加return監聽
	        channel.addReturnListener(new ReturnListener() {
	            @Override
	            public void handleReturn(int replyCode, String replyText, String exchange,
	                    String routingKey, BasicProperties properties, byte[] body) throws IOException {
	                //replyCode:響應碼    replyText:響應信息
	                System.err.println("---------handle  return----------");
	                System.err.println("replyCode: " + replyCode);
	                System.err.println("replyText: " + replyText);
	                System.err.println("exchange: " + exchange);
	                System.err.println("routingKey: " + routingKey);
	                //System.err.println("properties: " + properties);
	                System.err.println("body: " + new String(body));
	            }

				
	        });
	        //5 發送一條消息,第三個參數mandatory:必須設置為true
	        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
	    }
}

 

public class ReturnConsumer {
    
    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創建一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        //4 聲明交換機和隊列,然后進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 創建消費者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消費者: " + msg);
        }
    }
}

 

運行說明

先啟動消費端,訪問管控台:http://ip:15672,檢查Exchange和Queue是否設置OK,然后啟動生產端。
由於生產端設置的是一個錯誤的路由key,所以消費端沒有任何打印,而生產端打印了如下內容

如果我們將 Mandatory 屬性設置為false,對於不可達的消息會被Broker直接刪除,那么生產端就不會進行任何打印了。如果我們的路由key設置為正確的,那么消費端能夠正確消費,生產端也不會進行任何打印。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM