發送丟失
我們發送消息時,broker寫入到cache后就返回成功了,而producer只要獲取到ACK就說明消息發送成功了,反之肯定會收到一個異常,比如網絡錯誤、請求超時之內的。而當我們發送失敗后一直重試發送,能保證消息一定到達MQ嗎?比如這樣:

本地事務執行完之后、數據庫已經更新了,還沒來的及發送消息,producer就掛了。那現在就數據不一致了。於是現在又改造,將本地業務和消息發送放入同一個事務中,只要有異常那就回滾。

這樣看起來好像是沒有問題了,不過業務邏輯中如果涉及到redis、es的操作呢?拋出異常他們也不會回滾的。所以剛才的方式只適合簡單的業務。

解決方案
先發送half消息到broker中去,然后執行本地業務邏輯,最后再確定狀態commit/rollbact。即使確定狀態失敗了,broker也會主動回查消息狀態。從而保證了消息一定會發送到broker中去。
/** * 事務消息 * * @throws Exception */ public static void tranProducer()throws Exception{ //1.創建消息生產者producer,並制定生產者組名 TransactionMQProducer producer = new TransactionMQProducer("test-group-1"); //2. 設置NameServer的地址 producer.setNamesrvAddr("192.168.200.100:9876"); // 添加事務監聽器 producer.setTransactionListener(new TransactionListener() { /** * 在該方法中執行本地事務 * @param msg * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { if (StringUtils.equals("TAGA", msg.getTags())) { // 確認 System.out.println("A 提交。。。。。。可以被消費"); return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TAGB", msg.getTags())) { // 回滾 System.out.println("B 回滾。。。。。。無法被消費"); return LocalTransactionState.ROLLBACK_MESSAGE; } else{ // 無狀態 System.out.println("C 忽略。。。。。。無法被消費"); return LocalTransactionState.UNKNOW; } } /** * 該方法時MQ進行消息事務狀態回查 * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("回查的消息的Tag:" + msg.getTags()+" 。。。。。。可以被消費"); return LocalTransactionState.COMMIT_MESSAGE; } }); //3.啟動producer producer.start(); String[] tags = {"TAGA", "TAGB", "TAGC"}; for (int i = 0; i < 3; i++) { //4.創建消息對象,指定主題Topic、Tag和消息體 /** * 參數一:消息主題Topic * 參數二:消息Tag * 參數三:消息內容 */ Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes()); //5.發送消息 SendResult result = producer.sendMessageInTransaction(msg, null); //發送狀態 SendStatus status = result.getSendStatus(); System.out.println("發送結果:" + result); //線程睡1秒 TimeUnit.SECONDS.sleep(2); } //6.關閉生產者producer(不能關閉,不然拿不到回傳結果了) //producer.shutdown(); }

Broker消息丟失
現在我們的事務消息commit之后,那這條消息就發送成功可以消費了,但是如果這條消息僅僅只是剛寫入pageCache,還沒有寫入到磁盤文件文件呢?還是會丟失的。是的,可以用同步發送,直接寫入CommitLog不經過PageCache,那這樣就一定安全了么?如果寫入成功后恰好機器磁盤壞了,那照樣數據找不回來了。所以一定要明確一點:消息寫入MQ成功不代表消息一定不丟失。

解決方案
此時可以采用DLedger技術實現主從同步。commit后,消息必須寫入broker group中的大多數節點才算成功。這樣同步發送后可以保證有多分數據,並且即使master宕機,DLedger也會自動選舉出一個master來。這樣就做到了broker不丟失數據。
消費丟失
Consumer拿到消息后,先進行業務處理,然后再手動提交offset,告訴broker我處理完了。可能還沒來得及提交消息狀態consume就掛了,所以我們可以將consume做集群部署,只要沒有提交offset消息成功,broker就不認為你處理了這條消息,它會把消息交給consumer group內的其他機器去執行。但是千萬要做好冪等處理,不然重復消費的結果也是很嚴重的。
推薦架構解決方案
魚與熊掌不可兼得,非要100%的消息不丟失,那吞吐量肯定是很低的。因為同步刷盤沒有經過os cache那么性能肯定是很低的,。一般都是 同步復制+異步刷盤 。nameserver、broker、producer、consumer 全部集群部署,nameserver保存broker節點信息;broker主動同步復制消息,但是master寫入能力有限,可以采用多台master提升寫入能力。producer、consumer作為上下游服務肯定也是要保證高可用的。
。
