RocketMQ與MYSQL事務消息整合


1、基礎理論知識篇“兩階段提交”如果你了解可以跳過這段,當然如果你想深入了解你可以購買相關書籍或去搜索相關資料閱讀

  兩階段提交分為 正常提交和異常提交或異常回滾

       上面是正常提交的示意圖,協調者發起預提交請求,參與者回復成功之后協調者再次發起commit請求,統一提交事物。事物結束。

       如果這兩階段提交過程當中有任何一個請求出現異常就會回滾,如下流程:

       異常請求包括預提交 返回預提交的應答,commit請求 等任何一個失敗都會導致整個事物回滾。

  二階段提交的問題 
    二階段提交”還有一個很嚴重的問題就是如果commit過程當中失敗了 就導致了全部事物失敗,代價很大,簡單粗暴的處理方式

         還有一個問題是如果 commit過程中網絡出現問題 commit沒有被整個事物的參與者之一或者多個收到,這個時候就會出現數據不一致現象。

 
  可能大家會提到 協調者是誰,參與者又是誰那?

               這里簡單說下自己的理解

         如果在你的應用程序中你是通過 begin等相關操作語句開始的,比如 你使用了spring的@Transactional注解等,

         那協調者就是你的“應用程序”,參與者就是 mysql或其他支持事物的數據庫系統

         如果你就直接向mysql發送了一條sql語句mysql是自行提交的,那協調者和參與者都是mysql數據庫自己

2、這里說下mysql對所謂的“重復數據”提供的相關sql或關鍵字。

       unique 唯一主鍵約束

              在sql事物中和應用程序中都可以捕獲這個錯誤碼或異常,可以作為冪等判斷的一個依據。

       upset 操作,發現唯一主鍵沖突然后更新相關數據,mongodb有直接使用的sql方法語句

              示例:insert into tablename(column1) values(123) on duplicate key update column1 =column1 +123

        ignore 忽略操作對於多余的操作直接忽略

              insert ignore into tablename(column1)  values(123)

 

  基礎篇說完很多內容如果想深入了解可以自己找資料處理。下面是華麗分割線


3、在我們原有的認知里有一個方案就差那么一點點就可以大面積使用的。

       我們之前可能想過怎樣既能發送mq又能寫數據庫,下面這個方案會分接近我們的願望。

       我們遵從如下步驟進行代碼處理:

       1、開啟數據庫事物執行相關sql

       2、發送MQ消息

       3、提交數據庫事物

       (注意:以上每一步都是在上一步執行成功之后在執行下一步的)

       根據步驟我畫出了下面的流程圖

 其實這個流程是有一個漏洞的,如果我把上面的流程圖改造為下面的二階段提交的示意圖就會很明顯的看出來

        不知道大家有么有發現問題,是不是 各種提交和回滾操作都是針對的數據庫,而不是MQ。commit數據庫事物出現異常就會造成數據不一致現象。

        其實也不用在想有沒有其他的流程方案能解決分布式雙寫問題,只要存在多寫問題就存在數據不一致問題的現象,

        所以就出現了3pc Paxos 等協議來解決分布式事物/一致性的問題。

 

        下面我們開始介紹怎么使用mysql和RocketMQ來實現事物問題

         華麗分割線


4、RocketMQ事物消息的過程

       1、發送MQ的事物消息

       2、事物消息發送成功后會同步觸發對應執行本地接口來執行針對mysql數據庫的操作

       3、如果有未commit的消息,RocketMQ 的 broker會定時間隔時間來回查數據庫事物是否已經提交完成

5、結合RocketMQ的事物消息與Mysql數據庫事物的實現思想

  如果上面的二階段提交你已經理解了,你會發現我這里設計的流程(上面圖的流程)有點不太一樣的地方

        什么地方那?

        MQ事物消息回滾的時候是因為mysql數據庫事物沒有提交成功而導致的,也就是說如果mysql數據庫事務成功了MQ的事務消息是一定要成功的

        否則就會出現事物不一致的現象。

        假如發送MQ的prepare消息成功了,執行mysql事物的操作也成功了,但是偏偏返回給MQ的commit消息丟失了,那這個時候數據庫消息並不會回滾。

  所以就有了回查本地事物消息是否成功的操作,來對MQ的消息做個補償動作實現數據一致性

 

        理解了二階段提交以及RocketMQ的事物實現之后你就可以自己設計事物相關操作的執行順序了

        (這里的流程設計以及包括我的代碼實現是以我的理解做出的最佳實踐)

 6、RocketMQ與Mysql事物結合注意事項

       1、如果應用程序承擔協調者的工作就盡量晚開啟事物和盡量早的提交數據庫事物,事物中的sql對數據競爭多的sql盡量靠后

            因為執行數據庫事物會有各種鎖操作,減少鎖的生命周期,數據庫是稀缺資源,大家能省則省

       2、數據庫事物最好設置超時時間,超時之后自動解除,最好不超過1分鍾

       3、MQ默認1分鍾之后回查一次已發送message但未commit的消息,最多回查15次,之后會執行回滾操作

       4、應用程序一定要做好冪等處理(可以參考上面mysql相關語句實現冪等接口)

       5、網絡不要太差,否則會造成大量的重試,重試就會影響消息的及時性

       6、適用場景

                    單次請求數量小

                    每次請求會有數據產生,而不是查詢產生的數據(比如 insert操作叫生產數據,select操作不生產數據)

                    下游可以接受一定的延遲(這里有兩個因素,有應用程序本身和Broker,這里指broker)

                    下游服務或系統以接收到的消息為依據做相應的操作

                     MQ消息作為主要信息傳遞的工具

 

         下面說下具體代碼實現

         華麗分割線


 

7、實戰代碼解析

       首先附上源碼地址 https://github.com/zygfengyuwuzu/springboot-rocketmq-example

       下面將針對關鍵代碼進行講解

       首先介紹一下代碼目錄

 

 

         了解了上面的代碼目錄下面說下代碼的執行流程


    首先看事物消息生產者的實例對象創建
package rocketmq_example.mqandmysqltraction.producer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * 生產者和消費者測試的時候記得注掉一中的一個以免觀察不出效果
 * 
 */
@Component
public class TransactionProducer {
 static Logger logger = LoggerFactory.getLogger(TransactionProducer.class);

 public DefaultMQProducer producer = null;
 
 @Autowired
 TransactionListener transactionListenerImp;

 @PostConstruct
 private void init() throws MQClientException {
  logger.info("MQ事物生產者初始化開始--------------------------------------------------");
  TransactionMQProducer transactionProducer = new TransactionMQProducer("mytestgroup");
  // Producer 組名, 多個 Producer 如果屬於一 個應用,發送同樣的消息,則應該將它們 歸為同一組
  //transactionProducer.setProducerGroup("mytestgroup");
  // Name Server 地址列表
  transactionProducer.setNamesrvAddr("10.10.6.71:9876;10.10.6.72:9876");
  // 超時時間  這里一定要大於數據庫事物執行的超時時間
  transactionProducer.setSendMsgTimeout(90000);
  //這個線程池作用就是  mqbroker端回調信息的本地處理線程池
  ExecutorService executorService = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
     @Override
     public Thread newThread(Runnable r) {
      Thread thread = new Thread(r);
      thread.setName("client-transaction-msg-check-thread");
      return thread;
     }
    });
  transactionProducer.setExecutorService(executorService);
  transactionProducer.setTransactionListener(transactionListenerImp);
  producer = transactionProducer;
  producer.start();
  logger.info("MQ事物生產者初始化結束--------------------------------------------------");
 }
 public SendResult send(Message me) throws Exception {
  return producer.send(me);
 }
 /**
  * 發送普通消息
  * @param Topic
  * @param Tags
  * @param body
  * @return
  * @throws Exception
  */
 public SendResult send(String Topic, String Tags, String body) throws Exception {
  Message me = new Message();
  // 標示
  me.setTopic(Topic);
  // 標簽
  me.setTags(Tags);
  // 內容
  me.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));
  return producer.send(me);
 }
 /**
  * 發送普通消息
  * @param Topic
  * @param Tags
  * @param key
  * @param body
  * @return
  * @throws Exception
  */
 public SendResult send(String Topic, String Tags, String key, String body) throws Exception {
  try {
   Message me = new Message(Topic, Tags, key, 0, body.getBytes(RemotingHelper.DEFAULT_CHARSET), true);
   return producer.send(me);
  } catch (Exception e) {
   logger.error("發送MQ信息異常Topic{},Tags{},key{},body{}", Topic, Tags, key, body);
   throw e;
  }
 }
 @PreDestroy
 public void Destroy() {
  producer.shutdown();
 }
}

  上面的代碼我們接收到請求傳輸過來的數據之后,首先做了MQ消息對象的創建,創建成功之后直接發送MQ事物消息

  事物消息發送成功之后會調用上面設置的接口實現類的TransactionListenerImpl.executeLocalTransaction()這個方法。

  接口實現的方法代碼如下:

package rocketmq_example.mqandmysqltraction.producer;

import java.util.List;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import rocketmq_example.mqandmysqltraction.MyTableModel;
import rocketmq_example.mqandmysqltraction.MytableService;

/**
 * 把數據庫事物嵌套在mq事物當中不能顯示拋出異常
 * 
 * 
 * 
 * 
 * @author zyg
 *
 */
@Component
public class TransactionListenerImpl implements TransactionListener {

 static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);

 @Autowired
 MytableService mytableService;

 /**
  * 一定要設置執行sql時間,盡量不要超時
  * 
  */
 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  logger.info("開始執行本地數據庫事物  transactionid:{}", msg.getTransactionId());
  LocalTransactionState lts = LocalTransactionState.UNKNOW;
  @SuppressWarnings("unchecked")
  List<MyTableModel> mytablelist = (List<MyTableModel>) arg;
  try {
   long start=System.currentTimeMillis();
   //數據庫事物執行時間不要超過mq回查時間 默認15分鍾
   mytableService.execMytableinsert2(mytablelist, msg.getTransactionId());
   logger.info("執行數據庫事物耗時:{}",System.currentTimeMillis()-start);
   lts = LocalTransactionState.COMMIT_MESSAGE;
  } catch (Exception e) {
   logger.error("數據庫事務異常", e);
   lts = LocalTransactionState.ROLLBACK_MESSAGE;
  }
  logger.info("結束執行本地數據庫事物  transactionid:{} 返回:{}", msg.getTransactionId(),lts);
  return lts;
 }

 /**
  * 去數據庫查詢看看是否存在已經成功發送預提交數據而沒有commit成功的mq信息
  * 每分鍾1次默認15次
  * 
  * 這里可以做個計數 讓MQ重試5次/5分鍾就回滾減輕MQ回查的壓力
  * 
  */
 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  if (mytableService.existMyTableModelByMsgid(msg.getTransactionId())) {
   logger.info("查詢到已提交事物 transactionid:{}",msg.getTransactionId());
   return LocalTransactionState.COMMIT_MESSAGE;
  } else {
   logger.info("未查到已提交事物 transactionid:{}",msg.getTransactionId());
   return LocalTransactionState.UNKNOW;
  }

 }

}

     上面代碼有兩個方法,這里說下兩個方法的作用和執行時間

             executeLocalTransaction這個方法是發送完 事物消息 之后同步被調用到的方法,用來執行本地事物操作

             executeLocalTransaction方法有兩個參數,第一個是發送成功之后的message消息,在這個方法中包含事物ID其實就是msgid

             第二個參數是object類型的是從dataapi傳過來,

             我的代碼中沒做任何處理直接傳遞過來了然后直接轉化傳遞給了service層進行事物處理

             這個executeLocalTransaction方法里面為什么要直接返回commit或rollback,

             目的是盡量快的告訴MQ我的數據庫事務執行成功了,

             盡快將half消息轉為正常消息,已備消費者消費到做業務處理。

             這里完全可以直接返回unknow,等待broker回查來實現commit操作的。但是這樣做對回查消息broker造成一定的壓力。

      上面代碼的第二個方法是提供給broker回調執行的,進行檢查本地事務是否成功執行的操作,發起方是broker

             這里面我們接收到broker的回查請求之后直接去數據庫查詢是否存在broker提供的事務ID的數據

             如果存在返回commit標識,如果不存在返回unknow標識以等待下一次再來回查

      到此我們的一個事務操作就算完成了


    另外大家可以直接查看service層的實現代碼,就不一一解釋了
package rocketmq_example.mqandmysqltraction;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class MytableService {
 static Logger logger = LoggerFactory.getLogger(MytableService.class);

 @Autowired
 IMytableMapper mytable;

 @Autowired
 ObjectMapper objMapper;

 /**
  * 這里可以顯示提交事物 返回boolean 一條一條插入只是為了展現事物的特性 獲取所有異常 處理你的業務邏輯等等
  * 
  * @param mytablemodels
  * @return
  */
 @Transactional(rollbackFor = Exception.class, timeout = 60000)
 public List<Integer> execMytableinsert2(List<MyTableModel> mytablemodels, String msgid) {

  // logger.info("開始執行數據庫事物");
  List<Integer> result = new ArrayList<Integer>();
  for (MyTableModel myTableModel : mytablemodels) {
   // 插入數據庫
   myTableModel.setMsgid(msgid);
   mytable.insertmytable(myTableModel);
   result.add(myTableModel.getId());
  }
  // logger.info("結束執行數據庫事物");
  return result;
 }

 public boolean existMyTableModelById(Integer id) {
  MyTableModel myTableModel = mytable.selectMyTableModelById(id);
  if (myTableModel != null && null != myTableModel.getId()) {
   return true;
  }
  return false;
 }

 /**
  * 查詢是否存在已經發送過的msgid消息
  * 
  * @param msgid
  * @return
  */
 public boolean existMyTableModelByMsgid(String msgid) {
  int count = mytable.selectMyTableModelByMsgid(msgid);
  if (count > 0) {
   return true;
  }
  return false;
 }

 public void insetmsg(MyTableModel mytablemodel) {
  try {
   mytable.insertmsgrecord(mytablemodel);

  } catch (org.springframework.dao.DuplicateKeyException e) {
   logger.error("主鍵沖突異常被捕獲",e);
  }
 }
}

非常感謝你能看到這里!!!看到這里相信你已經對本篇博客的內容有所了解了!如果有什么問題或者想不通的地方歡迎評論區進行討論。

如果有不正確的地方懇請指正

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM