1.消息發送mq不丟失實現方式
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @description RocketMQ事務消息發送 */ public class TransactionProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { //用來接收RocketMQ回調的監聽接口 //這里是我們自己定義的實現執行本地事務,commit.rollback,回調查詢等邏輯 TransactionListener transactionListener = new TransactionListenerImpl(); //下面就是創建一個支持事務消息的Producer //對這個Producer指定一個生產者分組 TransactionMQProducer producer = new TransactionMQProducer("test"); //下面指定了一個線程池,里面包含一些線程 //這個線程就是用來處理RocketMQ的回調函數 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 1,//核心線程數 2,//線程池最大線程數 1000,//超時時間 TimeUnit.SECONDS,//時間的單位 new ArrayBlockingQueue<Runnable>(2000),//存放阻塞線程的列表 new ThreadFactory() {//線程創建工廠 @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("TestThread"); return thread; } } ); //給事務消息生產者設置對應的線程池,負責執行RocketMQ的回調請求 producer.setExecutorService(poolExecutor); //給生產者設置對應的回調函數 producer.setTransactionListener(transactionListener); //啟動消息生產者 producer.start(); //虛擬一條成功的消息 Message message = new Message( "successTopic" , "tag" , "key", ("成功的消息").getBytes(RemotingHelper.DEFAULT_CHARSET)); //這里存放發送的half消息到內存或者磁盤文件中,后台開啟一個線程,掃描這個文件,如果超過一定時間沒有收到響應,就回滾業務 //save(message) //將消息作為half消息的模式發送出去,如果發送失敗,則會收到一個異常,我們捕獲異常進行對應的異常處理即可 try { TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null); //這里可以更新或者刪除存放在本地內存或者磁盤文件的消息記錄 //delete(message) }catch (Exception e){ //half消息發送失敗 //本地系統執行業務回滾,更新數據庫信息等操作 } } }
上面是發送RocketMq的事務消息發送方法
下面是RocketMq的事務消息發送方法的回調函數的實現類
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; /** * @description 事務監聽實現類 */ public class TransactionListenerImpl implements TransactionListener { //如果half消息發送成功了就會回調這個函數,就可以執行本地事務 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { //執行本地事務 //根據本地一連串的事務執行結果,去選擇commit或者rollback try{ //如果本地事務都執行成功了,返回commit return LocalTransactionState.COMMIT_MESSAGE; }catch (Exception e){ //本地事務都失敗了,回滾所有的執行過的操作 //返回rollback,標記half消息無效 return LocalTransactionState.ROLLBACK_MESSAGE; } } //如果因為各種原因生產者沒有返回commit或者rollback給Broker, //broker會定時掃描沒有回應的half消息,然后回調這個函數 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { //查詢本地事務,是否都執行成功 Integer status = 0; //Integer status = localTrans.get(messageExt.getTransactionId()); //根據本地事務情況選擇執行commit或者rollback if (null != status){ switch(status){ case 0:return LocalTransactionState.UNKNOW; case 1:return LocalTransactionState.COMMIT_MESSAGE; case 2:return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
以上就是RocketMq的事務消息發送方法,可以實現發送消息的零丟失,以事務的方式確保消息一定可以發送到RocketMQ
2.mq消息不丟失實現方式
以上僅僅只能保證消息發送到mq成功,但是一定能保證消息不丟失嗎?顯然是不行的;
假設1:消息發送到了mq,就一定進入到了磁盤文件了嗎?rocketmq是先存入os cache中,也就是內存,如果這個時候機器宕機,內存上的數據也就全部丟失了,顯然消息也會丟失
解決方案:rocketmq默認是異步刷盤的模式,保證了數據的高吞吐量,但是有可能出現消息丟失的情況,可以改為同步刷盤策略,通過修改配置文件broker.config中flushDiskType參數為SYNC_FLUSH即可,這樣,只要mq告訴我們half消息響應成功了,就代表成功寫入了磁盤中了;
假設2:消息寫入磁盤就一定不會丟失嗎?顯然也不能;如果磁盤損壞,那么消息也會丟失
解決方案:基於Dledger和Raft協議的rocketmq主從架構,只要消息寫入master成功了,那么就一定會基於raft協議同步給其他的broker,就算master機器磁盤損壞,那么一定有broker存儲了同樣的消息,可以確保消息在mq上不會丟失;