【原】ActiveMq實現分布式事務一致性


  前言:關於分布式事務話題一直是頗有爭議的話題,在本文中通過ActiveMq 實現分布式事務做一個簡單的demo;同時也讓自己能在實踐中可以獲取經驗和對分布式事務自己的一些思考。

     

      1.本地事務

                我們通常只需借助開發平台中特有數據訪問技術和框架(例如Spring、JDBC、ADO.NET),結合關系型數據庫自帶的事務管理機制來實現事務性的需求。例如A給B轉賬100元並發送100代金券,不管是服務器掛掉還是轉賬失敗拋出異常,我們最終都要保證這個流程要么都成功要么都失敗,否則會出現數據異常。

   2.分布式事務

                余額表和代金券表分布在不同的節點的數據庫,轉賬和發放代金券是不同的應用,它們之間通信可能通過rpc,httpclient,mq;假設這時候A服務給B轉賬成功,但是發放代金券失敗,我們應該如何處理呢?筆者在現在公司項目里就有很多這樣的問題,我們是和第三方經常有數據交互,那么調用第三方的接口進行划撥操作,有可能在第三方划撥成功但是消息丟失(網絡異常、服務器掛掉、某些人新加的不合理代碼導致異常回滾等等)

   3.使用消息隊列ActiveMq實現事務一致性

  •         以下 demo簡單模擬用戶注冊后發放代金券這一過程;流程首先是用戶注冊成功后推送用戶信息到Active mq,代金券應用中也配置好了Active Mq,但是它是充當消費者的角色,實現代金券消息監聽,當監聽到消息后會拉取Active Mq的消息發執行派發金券動作; 其中用戶注冊是一個應用,發放代金券是另外一個應用,它們之間是通過activemq實現消息收發。
  •        首先創建2個maven項目,分別叫account和voucher,在這里我用的是springmvc+jdbc作為項目骨架。
  •        在account項目中我新首先建了一個UserController.java作為注冊的控制層,並提供注冊的方法,如下代碼示例,其中注意的是增加了一張消息表,關於為什么需要消息表下面會詳細解答。
  •  1 package com.zdd.mvc;
     2 
     3 import org.springframework.beans.factory.annotation.Autowired;
     4 import org.springframework.jms.core.JmsTemplate;
     5 import org.springframework.jms.core.MessageCreator;
     6 import org.springframework.stereotype.Controller;
     7 import org.springframework.ui.ModelMap;
     8 import org.springframework.web.bind.annotation.RequestMapping;
     9 import org.springframework.web.bind.annotation.RequestMethod;
    10 import org.springframework.web.bind.annotation.ResponseBody;
    11 import utils.ActiveMQutil;
    12 import utils.JdbcUtil;
    13 import utils.Result;
    14 
    15 import javax.jms.JMSException;
    16 import javax.jms.Message;
    17 import javax.jms.Session;
    18 
    19 /**
    20  * Created by dada on 2017/8/25.
    21  */
    22 @Controller
    23 @RequestMapping("/register")
    24 public class UserAccountController  {
    25 
    26     @Autowired
    27     private JmsTemplate jmsTemplate;
    28 
    29 
    30     @RequestMapping(method = RequestMethod.GET)
    31     public String register() {
    32         return "register";
    33     }
    34 
    35 
    36     @RequestMapping(method = RequestMethod.POST,value = "/doReg")
    37     @ResponseBody
    38     public Result doReg(final String phone) {
    39         JdbcUtil jdbcUtil = null;
    40         try{
    41         jdbcUtil = new JdbcUtil();
    42         jdbcUtil.getConnection();
    43 
    44         jdbcUtil.setAutoCommit(false);
       //往賬戶表添加一條數據
    45 String sql = "insert into account(phone) values ('"+phone+"')"; 46 int row = jdbcUtil.insert(sql); 47 if(row == 1){ 48 //插入到消息記錄表 49 sql = "insert into message(phone,status) values ('"+phone+"',0)"; 50 int m_row = jdbcUtil.insert(sql); 51 if(m_row == 1){ 52 //成功后發送隊列 53 jmsTemplate.send("voucher_message", new MessageCreator() { 54 @Override 55 public Message createMessage(Session session) throws JMSException { 56 return session.createTextMessage(phone); 57 } 58 }); 59 } 60 } 61 jdbcUtil.Commit(); 62 63 }catch (RuntimeException e){ 64 e.printStackTrace(); 65 jdbcUtil.rollback();//出現異常事務回滾 66 }finally { 67 if(null != jdbcUtil){ 68 jdbcUtil.releaseConn(); 69 } 70 } 71 Result result = new Result(); 72 return result; 73 } 74 75 }

       消息表主要用處是:  

  •   假如我們消息投遞到消息中間件后,消費者那邊出現異常,雖然信息已經被消費者消費了,但由於代碼或宕機導致消費端數據事務沒有成功提交,如果沒有消息表,我們將會丟失這一條數據。有了消息表后我們可以查詢到有哪些是屬於未成功派發的數據,這時候可以通過輪詢或者是其他方式再次把這批未成功消費的數據重新派發出去。

  • 根據上述代碼及注釋,我們來分析下可能的情況:

    1. 操作數據庫成功,向MQ中投遞消息也成功,皆大歡喜。

    2. 操作數據庫失敗,不會向MQ中投遞消息了。

    3. 操作數據庫成功,但是向MQ中投遞消息時失敗,向外拋出了異常,剛剛執行的更新數據庫的操作將被回滾。

    4. 操作數據庫成功,投遞MQ消息成功,消費異常,數據未更新,通過掃描消息表再次把數據取出進行消費。

    從上面分析的幾種情況來看,貌似問題都不大的。那么我們來分析下消費者端面臨的問題:

    1. 消息出列后,消費者對應的業務操作要執行成功。如果業務執行失敗,消息不能失效或者丟失。需要保證消息與業務操作一致。

    2. 盡量避免消息重復消費,消費前先查詢一下是否消費成功,一定要有一個標識標明,如果重復消費,也不能因此影響業務結果,保證冪等性。

 

           借用其他人的時序圖:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM