java事務(三)——自己實現分布式事務


    在上一篇《java事務(二)——本地事務》中已經提到了事務的類型,並對本地事務做了說明。而分布式事務是跨越多個數據源來對數據來進行訪問和更新,在JAVA中是使用JTA(Java Transaction API)來實現分布式的事務管理的。但是在本篇中並不會說明如何使用JTA,而是在不依賴其他框架以及jar包的情況下自己來實現分布式事務,作為對分布式事務的一個理解。

 

    假設現在有兩個數據庫,可以是在一台機器上也可以是在不同機器上,現在要向其中一個數據庫更新用戶賬戶信息,另外一個數據庫新增用戶的消費信息。首先說明一下,分布式事務也是事務,在事務特性的那篇博客中就已經說明了事務的四個特性:原子性、一致性、隔離性和持久性,那么分布式事務也必然是符合這四個特性的,這就要求同時對兩個數據庫進行數據訪問和更新的時候是作為一個單獨的工作單元來進行處理,並且同時成功或者失敗后進行回滾。但是在說明本地事務的時候已經提到了,本地事務是基於連接的,現在有兩個數據庫,分別保存數據,那么為了實現這個事務,必然會有兩個數據庫連接,這似乎是與事務基於連接的說法相悖。現在舉個例子:之前回老家去了一趟醫院,后來在辦理出院手續的時候是這樣的,辦理出院時需要護士站的主任醫生填寫出院單,然后攜帶結賬單到收費處繳納費用並去葯房取葯,然后回護士站蓋章,出院手續辦理完畢。如果把不同地點的窗口看成是不同的連接,那么實現辦理出院手續這個事務就必須保證在每個業務窗口上的事務都是成功的,最后出院手續才算真正完成。在最終蓋章的時候,需要查看每個窗口給出的單子是否是已辦理的,只有綜合起來所有的單子才能判定出院手續是否成功。這主要就是為了說明分布式事務實現的關鍵其實是管理每個連接上的事務,用一個東西來判定每個連接上的事務執行情況,綜合起來作為分布式事務執行成功與否的依據。這大概就是事務管理器要做的事情。雖然這個例子並不太恰當,很有挑毛病的地方,但是在不太鑽牛角尖的情況下,還是可以用來說明要表達的東西的。

實現例子

   我打開了兩台虛擬機,分別命令為node1、node2,每台虛擬機上都安裝了MySQL數據庫,在向node1上的數據庫更新用戶賬戶信息,向node2上的數據庫新增用戶消費信息。

 在node1上創建賬戶表,建表語句如下:

CREATE TABLE ACCOUNTS
(
    ID INT NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
    CUSTOMER_NO    VARCHAR(25) NOT NULL COMMENT '客戶號',
    CUSTOMER_NAME  VARCHAR(25) NOT NULL COMMENT '客戶名稱',
    CARD_ID         VARCHAR(18) NOT NULL COMMENT '身份證號',
    BANK_ID         VARCHAR(25) NOT NULL COMMENT '開戶行ID',
    BALANCE      DECIMAL  NOT NULL COMMENT '賬戶余額',
    CURRENCY     VARCHAR(10) NOT NULL COMMENT '幣種',
    PRIMARY KEY (ID)
)
COMMENT  = '賬戶表' ;

  然后向表中插入一條記錄,如下圖:

 在node2上創建用戶消費歷史表,建表語句如下:

CREATE TABLE USER_PURCHASE_HIS
(
    ID INT NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
    CUSTOMER_NO VARCHAR(25) NOT NULL COMMENT '客戶號',
    SERIAL_NO   VARCHAR(32) NOT NULL COMMENT '交易流水號',
    AMOUNT         DECIMAL     NOT NULL COMMENT '交易金額',
    CURRENCY    VARCHAR(10) NOT NULL COMMENT '幣種',
    REMARK         VARCHAR(100) NOT NULL COMMENT '備注',
    PRIMARY KEY (ID)
)
COMMENT = '用戶消費歷史表';

 下面實現一個簡陋的例子,代碼如下:

 1、創建DBUtil類,用來獲取和關閉連接

package person.lb.example1;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DBUtil {

    static {
        try {
            //加載驅動類
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } 
    }
    
    //獲取node1上的數據庫連接
    public static Connection getNode1Connection() {
        Connection conn = null;
        try {
            conn = (Connection) DriverManager.getConnection(
                    "jdbc:mysql://192.168.0.108:3306/TEST", 
                    "root", 
                    "root");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }
    
    //獲取node2上的數據庫連接
    public static Connection getNode2Connection() {
        Connection conn = null;
        try {
            conn = (Connection) DriverManager.getConnection(
                    "jdbc:mysql://192.168.0.109:3306/TEST", 
                    "root", 
                    "root");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }
    
    //關閉連接
    public static void close(ResultSet rs, Statement st, Connection conn) {
        try {
            if(rs != null) {
                rs.close();
            }
            if(st != null) {
                st.close();
            }
            if(conn != null) {
                conn.close();
            }
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

2、創建XADemo類,用來測試事務

package person.lb.example1;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

public class XADemo {

    public static void main(String[] args) {

        //獲取連接
        Connection node1Conn = DBUtil.getNode1Connection();
        Connection node2Conn = DBUtil.getNode2Connection();
        try {
            //設置連接為非自動提交
            node1Conn.setAutoCommit(false);
            node2Conn.setAutoCommit(false);
            //更新賬戶信息
            updateAccountInfo(node1Conn);
            //增加用戶消費信息
            addUserPurchaseInfo(node2Conn);
            //提交
            node1Conn.commit();
            node2Conn.commit();
        } catch (SQLException e) {
            e.printStackTrace();
            //回滾
            try {
                node1Conn.rollback();
                node2Conn.rollback();
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
        } finally {
            //關閉連接
            DBUtil.close(null, null, node1Conn);
            DBUtil.close(null, null, node2Conn);
        }
    }
    
    /**
     * 更新賬戶信息
     * @param conn
     * @throws SQLException 
     */
    private static void updateAccountInfo(Connection conn) throws SQLException {
        Statement st = conn.createStatement();
        st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");
    }
    
    /**
     * 增加用戶消費信息
     * @param conn
     * @throws SQLException 
     */
    private static void addUserPurchaseInfo(Connection conn) throws SQLException {
        Statement st = conn.createStatement();
        st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
                + " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 100, 'CNY', '買衣服')");
    }

}

  這是一個沒有發生任何異常的例子,執行結果是nod1上ACCOUNTS 表中的BALANCE字段的值成功更新為9900,而node2上USER_PURCHASE_HIS表中新增了一條記錄,兩個連接上的事務都成功完成,事務目標實現。如果反向測試一下,更改Insert語句,把其中某一個要插入的值改為NULL,由於字段都是非空限制,所以會發生異常,這個連接上的事務會失敗,那么跟它關聯的node1上的事務也必須回滾,不對數據庫進行任何更改。經測試,結果與預期目標一致。說明這個例子是符合事務特性的。

 

  但是這個例子不管是從代碼的可讀性和可維護性上來說都是比較差的。在使用spring開發項目的時候,配置了事務管理器以后,在我們的業務邏輯中幾乎是察覺不到事務控制的,而且也看不到事務控制的代碼。那么究竟spring中是怎么實現的事務控制呢,這篇博客中不會詳細說明,但是要提到兩個東西,事務管理器和資源管理器,現在自己來實現一個簡單的事務管理器和資源管理器來對事務進行控制。

代碼示例如下:

1、創建AbstractDataSource 類

package person.lb.datasource;

import java.sql.Connection;
import java.sql.SQLException;

public abstract class AbstractDataSource {

    //獲取連接
    public abstract Connection getConnection() throws SQLException ;
    //關閉連接
    public abstract void close() throws SQLException;
        
}

2、創建Node1DataSource 類,用來連接node1上的數據庫

package person.lb.datasource;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Node1DataSource extends AbstractDataSource {

    //使用ThreadLocal類保存當前線程使用的Connection
    protected static final ThreadLocal<Connection> threadSession = new ThreadLocal<Connection>();
    
    static {
        try {
            //加載驅動類
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } 
    }
    
    private final static Node1DataSource node1DataSource = new Node1DataSource();
    
    private Node1DataSource() {}
    
    public static Node1DataSource getInstance() {
        return node1DataSource;
    }
    
    /**
     * 獲取連接
     */
    @Override
    public Connection getConnection() throws SQLException {
        Connection conn = null;
        if(threadSession.get() == null) {
            conn = (Connection) DriverManager.getConnection(
                        "jdbc:mysql://192.168.0.108:3306/TEST", 
                        "root", 
                        "root");
            threadSession.set(conn);
        } else {
            conn = threadSession.get();
        }
        return conn;
    }

    /**
     * 關閉並移除連接
     */
    @Override
    public void close() throws SQLException {
        Connection conn = threadSession.get();
        if(conn != null) {
            conn.close();
            threadSession.remove();
        }
    }

}

3、創建Node2DataSource類,用來連接node2機器上的數據庫

package person.lb.datasource;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;


public class Node2DataSource extends AbstractDataSource {

    //使用ThreadLocal類保存當前線程使用的Connection
    protected static final ThreadLocal<Connection> threadSession = new ThreadLocal<Connection>();
        
    static {
        try {
            //加載驅動類
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } 
    }
    
    private static final Node2DataSource node2DataSource  = new Node2DataSource();
    
    private Node2DataSource() {};
    
    public static Node2DataSource getInstance() {
        return node2DataSource;
    }
    
    /**
     * 獲取連接
     */
    @Override
    public Connection getConnection() throws SQLException {
        Connection conn = null;
        if(threadSession.get() == null) {
            conn = (Connection) DriverManager.getConnection(
                        "jdbc:mysql://192.168.0.109:3306/TEST", 
                        "root", 
                        "root");
            threadSession.set(conn);
        } else {
            conn = threadSession.get();
        }
        return conn;
    }

    /**
     * 關閉並移除連接
     */
    @Override
    public void close() throws SQLException {
        Connection conn = threadSession.get();
        if(conn != null) {
            conn.close();
            threadSession.remove();
        }
    }
}

4、創建Node1Dao類,在node1的數據庫中更新賬戶信息

package person.lb.dao;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import person.lb.datasource.Node1DataSource;

public class Node1Dao {

    private Node1DataSource dataSource = Node1DataSource.getInstance();
    
    /**
     * 更新賬戶信息
     * @throws SQLException
     */
    public void updateAccountInfo() throws SQLException {
        Connection conn = dataSource.getConnection();
        Statement st = conn.createStatement();
        st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");
    }
}

5、創建Node2Dao,在node2機器上增加用戶消費信息

package person.lb.dao;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import person.lb.datasource.Node2DataSource;

public class Node2Dao {

    private Node2DataSource dataSource = Node2DataSource.getInstance();
    
    /**
     * 增加用戶消費信息
     * @throws SQLException
     */
    public void addUserPurchaseInfo() throws SQLException {
        Connection conn = dataSource.getConnection();
        Statement st = conn.createStatement();
        st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
                + " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', null, 'CNY', '買衣服')");
    }
}

6、創建NodeService類,把兩個操作作為一個事務來執行

package person.lb.service;

import java.sql.SQLException;

import person.lb.dao.Node1Dao;
import person.lb.dao.Node2Dao;
import person.lb.transaction.TransactionManager;

public class NodeService {

    
    public void execute() {
        //啟動事務
        TransactionManager.begin();
        
        Node1Dao node1Dao = new Node1Dao();
        Node2Dao node2Dao = new Node2Dao();
        try {
            node1Dao.updateAccountInfo();
            node2Dao.addUserPurchaseInfo();
            //提交事務
            TransactionManager.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

7、最后是測試類TestTx

package person.lb.test;

import person.lb.service.NodeService;

public class TestTx {

    public static void main(String[] args) {
        NodeService nodeService = new NodeService();
        nodeService.execute();
    }
}

經測試,與第一個例子效果一致,但是從代碼上來說要比第一個例子的可讀性和可維護性高。不過這個例子並不能說明分布式事務中的事務管理器和資源管理器的真正原理,也不是一個可使用的代碼,畢竟存在缺陷,而且dao層需要拋出異常才能實現事務的回滾。我想,作為一個理解分布式事務的作用的例子是夠了。

 

最后是這篇博客中的源碼:TransactionDemo.rar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM