分布式事務(一)兩階段提交及JTA


原創文章,同步發自作者個人博客 http://www.jasongj.com/big_data/two_phase_commit/

分布式事務

分布式事務簡介

分布式事務是指會涉及到操作多個數據庫(或者提供事務語義的系統,如JMS)的事務。其實就是將對同一數據庫事務的概念擴大到了對多個數據庫的事務。目的是為了保證分布式系統中事務操作的原子性。分布式事務處理的關鍵是必須有一種方法可以知道事務在任何地方所做的所有動作,提交或回滾事務的決定必須產生統一的結果(全部提交或全部回滾)。

分布式事務實現機制

如同作者在《SQL優化(六) MVCC PostgreSQL實現事務和多版本並發控制的精華》一文中所講,事務包含原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。

PostgreSQL針對ACID的實現技術如下表所示。

| ACID | 實現技術 |
|---------------------------|
| 原子性(Atomicity) | MVCC |
| 一致性(Consistency) | 約束(主鍵、外鍵等) |
| 隔離性 | MVCC |
| 持久性 | WAL |

分布式事務的實現技術如下表所示。(以PostgreSQL作為事務參與方為例)

| 分布式ACID | 實現技術 |
|---------------------------|
| 原子性(Atomicity) | MVCC + 兩階段提交 |
| 一致性(Consistency) | 約束(主鍵、外鍵等) |
| 隔離性 | MVCC |
| 持久性 | WAL |

從上表可以看到,一致性、隔離性和持久性靠的是各分布式事務參與方自己原有的機制,而兩階段提交主要保證了分布式事務的原子性。

兩階段提交

分布式事務如何保證原子性

在分布式系統中,各個節點(或者事務參與方)之間在物理上相互獨立,通過網絡進行協調。每個獨立的節點(或組件)由於存在事務機制,可以保證其數據操作的ACID特性。但是,各節點之間由於相互獨立,無法確切地知道其經節點中的事務執行情況,所以多節點之間很難保證ACID,尤其是原子性。

如果要實現分布式系統的原子性,則須保證所有節點的數據寫操作,要不全部都執行(生效),要么全部都不執行(生效)。但是,一個節點在執行本地事務的時候無法知道其它機器的本地事務的執行結果,所以它就不知道本次事務到底應該commit還是 roolback。常規的解決辦法是引入一個“協調者”的組件來統一調度所有分布式節點的執行。

XA規范

XA是由X/Open組織提出的分布式事務的規范。XA規范主要定義了(全局)事務管理器(Transaction Manager)和(局部)資源管理器(Resource Manager)之間的接口。XA接口是雙向的系統接口,在事務管理器(Transaction Manager)以及一個或多個資源管理器(Resource Manager)之間形成通信橋梁。XA引入的事務管理器充當上文所述全局事務中的“協調者”角色。事務管理器控制着全局事務,管理事務生命周期,並協調資源。資源管理器負責控制和管理實際資源(如數據庫或JMS隊列)。目前,Oracle、Informix、DB2、Sybase和PostgreSQL等各主流數據庫都提供了對XA的支持。

XA規范中,事務管理器主要通過以下的接口對資源管理器進行管理

  • xa_open,xa_close:建立和關閉與資源管理器的連接。
  • xa_start,xa_end:開始和結束一個本地事務。
  • xa_prepare,xa_commit,xa_rollback:預提交、提交和回滾一個本地事務。
  • xa_recover:回滾一個已進行預提交的事務。

兩階段提交原理

二階段提交的算法思路可以概括為:協調者詢問參與者是否准備好了提交,並根據所有參與者的反饋情況決定向所有參與者發送commit或者rollback指令(協調者向所有參與者發送相同的指令)。

所謂的兩個階段是指

  • 准備階段 又稱投票階段。在這一階段,協調者詢問所有參與者是否准備好提交,參與者如果已經准備好提交則回復Prepared,否則回復Non-Prepared
  • 提交階段 又稱執行階段。協調者如果在上一階段收到所有參與者回復的Prepared,則在此階段向所有參與者發送commit指令,所有參與者立即執行commit操作;否則協調者向所有參與者發送rollback指令,參與者立即執行rollback操作。

兩階段提交中,協調者和參與方的交互過程如下圖所示。
Two-phase commit

兩階段提交前提條件

  • 網絡通信是可信的。雖然網絡並不可靠,但兩階段提交的主要目標並不是解決諸如拜占庭問題的網絡問題。同時兩階段提交的主要網絡通信危險期(In-doubt Time)在事務提交階段,而該階段非常短。
  • 所有crash的節點最終都會恢復,不會一直處於crash狀態。
  • 每個分布式事務參與方都有WAL日志,並且該日志存於穩定的存儲上。
  • 各節點上的本地事務狀態即使碰到機器crash都可從WAL日志上恢復。

兩階段提交容錯方式

兩階段提交中的異常主要分為如下三種情況

  1. 協調者正常,參與方crash
  2. 協調者crash,參與者正常
  3. 協調者和參與方都crash

對於第一種情況,若參與方在准備階段crash,則協調者收不到Prepared回復,協調方不會發送commit命令,事務不會真正提交。若參與方在提交階段提交,當它恢復后可以通過從其它參與方或者協調方獲取事務是否應該提交,並作出相應的響應。

第二種情況,可以通過選出新的協調者解決。

第三種情況,是兩階段提交無法完美解決的情況。尤其是當協調者發送出commit命令后,唯一收到commit命令的參與者也crash,此時其它參與方不能從協調者和已經crash的參與者那兒了解事務提交狀態。但如同上一節兩階段提交前提條件所述,兩階段提交的前提條件之一是所有crash的節點最終都會恢復,所以當收到commit的參與方恢復后,其它節點可從它那里獲取事務狀態並作出相應操作。

JTA

JTA介紹

作為java平台上事務規范JTA(Java Transaction API)也定義了對XA事務的支持,實際上,JTA是基於XA架構上建模的。在JTA 中,事務管理器抽象為javax.transaction.TransactionManager接口,並通過底層事務服務(即Java Transaction Service)實現。像很多其他的Java規范一樣,JTA僅僅定義了接口,具體的實現則是由供應商(如J2EE廠商)負責提供,目前JTA的實現主要有以下幾種:

  • J2EE容器所提供的JTA實現(如JBoss)。
  • 獨立的JTA實現:如JOTM(Java Open Transaction Manager),Atomikos。這些實現可以應用在那些不使用J2EE應用服務器的環境里用以提供分布事事務保證。

PostgreSQL兩階段提交接口

  • PREPARE TRANSACTION transaction_id PREPARE TRANSACTION 為當前事務的兩階段提交做准備。 在命令之后,事務就不再和當前會話關聯了;它的狀態完全保存在磁盤上, 它提交成功有非常高的可能性,即使是在請求提交之前數據庫發生了崩潰也如此。這條命令必須在一個用BEGIN顯式開始的事務塊里面使用。
  • COMMIT PREPARED transaction_id 提交已進入准備階段的ID為transaction_id的事務
  • ROLLBACK PREPARED transaction_id 回滾已進入准備階段的ID為transaction_id的事務

典型的使用方式如下

postgres=> BEGIN;
BEGIN
postgres=> CREATE TABLE demo(a TEXT, b INTEGER);    
CREATE TABLE
postgres=> PREPARE TRANSACTION 'the first prepared transaction';
PREPARE TRANSACTION
postgres=> SELECT * FROM pg_prepared_xacts;
 transaction |              gid               |           prepared            | owner | database 
-------------+--------------------------------+-------------------------------+-------+----------
       23970 | the first prepared transaction | 2016-08-01 20:44:55.816267+08 | casp  | postgres
(1 row)

從上面代碼可看出,使用PREPARE TRANSACTION transaction_id語句后,PostgreSQL會在pg_catalog.pg_prepared_xact表中將該事務的transaction_id記於gid字段中,並將該事務的本地事務ID,即23970,存於transaction字段中,同時會記下該事務的創建時間及創建用戶和數據庫名。

繼續執行如下命令

postgres=> \q
SELECT * FROM pg_prepared_xacts;
 transaction |              gid               |           prepared            | owner | database 
-------------+--------------------------------+-------------------------------+-------+----------
       23970 | the first prepared transaction | 2016-08-01 20:44:55.816267+08 | casp  | cqdb
(1 row)

cqdb=> ROLLBACK PREPARED 'the first prepared transaction';            
ROLLBACK PREPARED
cqdb=> SELECT * FROM pg_prepared_xacts;
 transaction | gid | prepared | owner | database 
-------------+-----+----------+-------+----------
(0 rows)

即使退出當前session,pg_catalog.pg_prepared_xact表中關於已經進入准備階段的事務信息依然存在,這與上文所述准備階段后各節點會將事務信息存於磁盤中持久化相符。注:如果不使用PREPARED TRANSACTION 'transaction_id',則已BEGIN但還未COMMIT或ROLLBACK的事務會在session退出時自動ROLLBACK。

在ROLLBACK已進入准備階段的事務時,必須指定其transaction_id

PostgreSQL兩階段提交注意事項

  • PREPARE TRANSACTION transaction_id命令后,事務狀態完全保存在磁盤上。
  • PREPARE TRANSACTION transaction_id命令后,事務就不再和當前會話關聯,因此當前session可繼續執行其它事務。
  • COMMIT PREPAREDROLLBACK PREPARED可在任何會話中執行,而並不要求在提交准備的會話中執行。
  • 不允許對那些執行了涉及臨時表或者是創建了帶WITH HOLD游標的事務進行PREPARE。 這些特性和當前會話綁定得實在是太緊密了,因此在一個准備好的事務里沒什么可用的。
  • 如果事務用SET修改了運行時參數,這些效果在PREPARE TRANSACTION之后保留,並且不會被任何以后的COMMIT PREPAREDROLLBACK PREPARED所影響,因為SET的生效范圍是當前session。
  • 從性能的角度來看,把一個事務長時間停在准備好的狀態是不明智的,因為它會影響VACUUM回收存儲的能力。
  • 已准備好的事務會繼續持有它們獲得的鎖,直到該事務被commit或者rollback。所以如果已進入准備階段的事務一直不被處理,其它事務可能會因為獲取不到鎖而被block或者失敗。
  • 默認情況下,PostgreSQL並不開啟兩階段提交,可以通過在postgresql.conf文件中設置max_prepared_transactions配置項開啟PostgreSQL的兩階段提交。

JTA實現PostgreSQL兩階段提交

本文使用Atomikos提供的JTA實現,利用PostgreSQL提供的兩階段提交特性,實現了分布式事務。本文中的分布式事務使用了2個不同機器上的PostgreSQL實例。

本例所示代碼可從作者Github獲取。

package com.jasongj.jta.resource;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.WebApplicationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/jta")
public class JTAResource {
  private static final Logger LOGGER = LoggerFactory.getLogger(JTAResource.class);

  @GET
  public String test(@PathParam(value = "commit") boolean isCommit)
      throws NamingException, SQLException, NotSupportedException, SystemException {
    UserTransaction userTransaction = null;
    try {
      Context context = new InitialContext();
      userTransaction = (UserTransaction) context.lookup("java:comp/UserTransaction");
      userTransaction.setTransactionTimeout(600);
      
      userTransaction.begin();
      
      DataSource dataSource1 = (DataSource) context.lookup("java:comp/env/jdbc/1");
      Connection xaConnection1 = dataSource1.getConnection();
      
      DataSource dataSource2 = (DataSource) context.lookup("java:comp/env/jdbc/2");
      Connection xaConnection2 = dataSource2.getConnection();
      LOGGER.info("Connection autocommit : {}", xaConnection1.getAutoCommit());

      Statement st1 = xaConnection1.createStatement();
      Statement st2 = xaConnection2.createStatement();
      LOGGER.info("Connection autocommit after created statement: {}", xaConnection1.getAutoCommit());
      

      st1.execute("update casp.test set qtime=current_timestamp, value = 1");
      st2.execute("update casp.test set qtime=current_timestamp, value = 2");
      LOGGER.info("Autocommit after execution : ", xaConnection1.getAutoCommit());

      userTransaction.commit();
      LOGGER.info("Autocommit after commit: ",  xaConnection1.getAutoCommit());
      return "commit";

    } catch (Exception ex) {
      if (userTransaction != null) {
        userTransaction.rollback();
      }
      LOGGER.info(ex.toString());
      throw new WebApplicationException("failed", ex);
    }
  }
}

從上示代碼中可以看到,雖然使用了Atomikos的JTA實現,但因為使用了面向接口編程特性,所以只出現了JTA相關的接口,而未顯式使用Atomikos相關類。具體的Atomikos使用是在WebContent/META-INFO/context.xml中配置。

<Context>
  <Transaction factory="com.atomikos.icatch.jta.UserTransactionFactory" />
    <Resource name="jdbc/1"
    auth="Container"
    type="com.atomikos.jdbc.AtomikosDataSourceBean"
    factory="com.jasongj.jta.util.EnhancedTomcatAtomikosBeanFactory"
    uniqueResourceName="DataSource_Resource1"
    minPoolSize="2"
    maxPoolSize="8"
    testQuery="SELECT 1"
    xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
    xaProperties.databaseName="postgres"
    xaProperties.serverName="192.168.0.1"
    xaProperties.portNumber="5432"
    xaProperties.user="casp"
    xaProperties.password=""/>

    <Resource name="jdbc/2"
    auth="Container"
    type="com.atomikos.jdbc.AtomikosDataSourceBean"
    factory="com.jasongj.jta.util.EnhancedTomcatAtomikosBeanFactory"
    uniqueResourceName="DataSource_Resource2"
    minPoolSize="2"
    maxPoolSize="8"
    testQuery="SELECT 1"
    xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
    xaProperties.databaseName="postgres"
    xaProperties.serverName="192.168.0.2"
    xaProperties.portNumber="5432"
    xaProperties.user="casp"
    xaProperties.password=""/>  
</Context>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM