ShardingSphere的分布式事務


如何理解分布式事務?

在傳統的關系型數據庫中,事務是一個標准組件,幾乎所有成熟的關系型數據庫都提供了對本地事務的原生支持。本地事務提供了 ACID 事務特性。基於本地事務,為了保證數據的一致性,我們先開啟一個事務后,才可以執行數據操作,最后提交或回滾就可以了。更進一步,借助於 Spring 等集成化框架,開發人員只需關注引起數據改變的業務即可。

但在分布式環境下,事情就會變得比較復雜。假設系統中存在多個獨立的數據庫,為了確保數據在這些獨立的數據庫中保持一致,我們需要把這些數據庫納入同一個事務中。這時本地事務就無能為力了,我們需要使用分布式事務。

業界關於如何實現分布式事務也有一些通用的實現機制,例如支持兩階段提交的 XA 協議以及以 Saga 為代表的柔性事務。針對不同的實現機制,也存在一些供應商和開發工具。因為這些開發工具在使用方式上和實現原理上都有較大的差異性,所以開發人員的一大訴求在於,希望能有一套統一的解決方案能夠屏蔽這些差異。同時,我們也希望這種解決方案能夠提供友好的系統集成性。

ShardingSphere 作為一款分布式數據庫中間件,勢必要考慮分布式事務的實現方案。而在設計上,ShardingSphere 從一開始就充分考慮到了開發人員的這些訴求,接下來讓我們一起來看一下。

ShardingSphere 中的分布式事務
在 ShardingSphere 中,除本地事務之外,還提供針對分布式事務的兩種實現方案,分別是 XA 事務和柔性事務。這點可以從事務類型枚舉值 TransactionType 中得到驗證:

image-20201118192610726

XA 事務
XA 事務提供基於兩階段提交協議的實現機制。所謂兩階段提交,顧名思義分成兩個階段,一個是准備階段,一個是執行階段。在准備階段中,協調者發起一個提議,分別詢問各參與者是否接受。在執行階段,協調者根據參與者的反饋,提交或終止事務。如果參與者全部同意則提交,只要有一個參與者不同意就終止。

image-20201118192720819

目前,業界在實現 XA 事務時也存在一些主流工具庫,包括 Atomikos、Narayana 和 Bitronix。ShardingSphere 對這三種工具庫都進行了集成,並默認使用 Atomikos 來完成兩階段提交。

BASE 事務
XA 事務是典型的強一致性事務,也就是完全遵循事務的 ACID 設計原則。與 XA 事務這種“剛性”不同,柔性事務則遵循 BASE 設計理論,追求的是最終一致性。這里的 BASE 來自基本可用(Basically Available)、軟狀態(Soft State)和最終一致性(Eventual Consistency)這三個概念。

關於如何實現基於 BASE 原則的柔性事務,業界也存在一些優秀的框架,例如阿里巴巴提供的 Seata。ShardingSphere 內部也集成了對 Seata 的支持。當然,我們也可以根據需要,集成其他分布式事務類開源框架,並基於微內核架構嵌入到 ShardingSphere 運行時環境中。

介紹完理論知識之后,接下來讓我們分別使用 XA 事務和 BASE 事務來實現分布式環境下的數據一致性。

使用 XA 事務
在 Spring 應用程序中添加對 XA 事務的支持相對簡單,無論是 Spring 框架,還是 ShardingSphere 自身,都為我們提供了低成本的開發機制。

開發環境准備
在今天的案例中,我們將演示如何在分庫環境下實現分布式事務,因此我們需要在 Spring Boot 中創建一個 .properties 文件,並包含分庫需要的所有配置項信息:

spring.shardingsphere.datasource.names=ds0,ds1

 

spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource

spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver

spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/ds0

spring.shardingsphere.datasource.ds0.username=root

spring.shardingsphere.datasource.ds0.password=root

spring.shardingsphere.datasource.ds0.autoCommit: false

 

spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource

spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver

spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://localhost:3306/ds1

spring.shardingsphere.datasource.ds1.username=root

spring.shardingsphere.datasource.ds1.password=root

spring.shardingsphere.datasource.ds0.autoCommit: false

 

spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id

spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=ds$->{user_id % 2}

spring.shardingsphere.sharding.binding-tables=health_record,health_task

spring.shardingsphere.sharding.broadcast-tables=health_level

 

spring.shardingsphere.sharding.tables.health_record.actual-data-nodes=ds$->{0..1}.health_record

spring.shardingsphere.sharding.tables.health_record.key-generator.column=record_id

spring.shardingsphere.sharding.tables.health_record.key-generator.type=SNOWFLAKE

spring.shardingsphere.sharding.tables.health_record.key-generator.props.worker.id=33

spring.shardingsphere.sharding.tables.health_task.actual-data-nodes=ds$->{0..1}.health_task

spring.shardingsphere.sharding.tables.health_task.key-generator.column=task_id

spring.shardingsphere.sharding.tables.health_task.key-generator.type=SNOWFLAKE

spring.shardingsphere.sharding.tables.health_task.key-generator.props.worker.id=33

 

spring.shardingsphere.props.sql.show=true

實現 XA 事務

通過分庫配置,我們將獲取 SQL 執行的目標 DataSource。由於我們使用 Spring 框架而不是使用原生的 JDBC 進行事務管理,所以需要將 DataSource 與 Spring 中的事務管理器 PlatformTransactionManager 關聯起來。

另一方面,為了更好地集成 ShardingSphere 中的分布式事務支持,我們可以通過 Spring 框架提供的 JdbcTemplate 模板類來簡化 SQL 的執行過程。一種常見的做法是創建一個事務配置類來初始化所需的 PlatformTransactionManager 和 JdbcTemplate 對象:

@Configuration
@EnableTransactionManagement
public class TransactionConfiguration {
    
    @Bean
    public PlatformTransactionManager txManager(final DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate(final DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }    
}

一旦初始化了 JdbcTemplate,就可以在業務代碼中注入這個模板類來執行各種 SQL 操作,常見的做法是傳入一個 PreparedStatementCallback,並在這個回調中執行各種具體的 SQL:

@Autowired

JdbcTemplate jdbcTemplate;

 

jdbcTemplate.execute(SQL, (PreparedStatementCallback<Object>) preparedStatement -> {

    …

    return preparedStatement;

});

在上面的代碼中,我們通過 PreparedStatementCallback 回調獲取一個 PreparedStatement 對象。或者,我們可以使用 JdbcTemplate 另一種執行 SQL 的代碼風格,通過使用更基礎的 ConnectionCallback 回調接口:

jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {

    …

    return connection;

});

為了在業務代碼中以最少的開發成本嵌入分布式事務機制,ShardingSphere 也專門提供了一個 @ShardingTransactionType 注解來配置所需要執行的事務類型:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ShardingTransactionType {
    TransactionType value() default TransactionType.LOCAL;
}

我們知道,ShardingSphere 提供的事務類型有三種,分別是 LOCAL、XA 和 BASE,默認使用的是 LOCAL。所以如果需要用到分布式事務,需要在業務方法上顯式的添加這個注解:

@Transactional
@ShardingTransactionType(TransactionType.XA)
public void insert(){
    …
}

另一種設置 TransactionType 的方式是使用 TransactionTypeHolder 工具類。TransactionTypeHolder 類中通過 ThreadLocal 來保存 TransactionType:

public final class TransactionTypeHolder {
    
    private static final ThreadLocal<TransactionType> CONTEXT = new ThreadLocal<TransactionType>() {
        
        @Override
        protected TransactionType initialValue() {
            return TransactionType.LOCAL;
        }
    };
    
    /**
     * Get transaction type for current thread.
     *
     * @return transaction type
     */
    public static TransactionType get() {
        return CONTEXT.get();
    }
    
    /**
     * Set transaction type for current thread.
     *
     * @param transactionType transaction type
     */
    public static void set(final TransactionType transactionType) {
        CONTEXT.set(transactionType);
    }
    
    /**
     * Clear transaction type for current thread.
     */
    public static void clear() {
        CONTEXT.remove();
    }
}

可以看到,TransactionTypeHolder 中默認采用的是本地事務,我們可以通過 set 方法來改變初始設置:

TransactionTypeHolder.set(TransactionType.XA);

現在,使用 XA 開發分布式事務的整體結構的方法已經梳理清楚了,我們可以通過創建一個 insertHealthRecords 方法,在其中添加對 HealthRecord 和 HealthTask 的數據插入代碼:

private List<Long> insertHealthRecords() throws SQLException {
		List<Long> result = new ArrayList<>(10);       
		jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {
			connection.setAutoCommit(false);
			
        	try {        		
        		for (Long i = 1L; i <= 10; i++) {
        			HealthRecord healthRecord = createHealthRecord(i);   		   		
    	    		insertHealthRecord(healthRecord, connection);
    	    		
    	    		HealthTask healthTask = createHealthTask(i, healthRecord);
    	        	insertHealthTask(healthTask, connection);
    	        	
    	        	result.add(healthRecord.getRecordId());    	   
    	        	
    	        	//手工拋出異常
    	        	//throw new SQLException("exception occur!");
                }        		
        		connection.commit();
        	} catch (final SQLException ex) {
        		connection.rollback();
                throw ex;
            }
	    	
			return connection;
	    });
		
		return result;
	}

可以看到,在執行插入操作之前,我們關閉了 Connection 的自動提交功能。在 SQL 執行完畢之后,手動通過 Connection commit 方法執行事務提交。一旦在 SQL 的執行過程中出現任何異常時,就調用 Connection 的 rollback 方法回滾事務。

這里有必要介紹執行數據插入的具體實現過程,我們以 insertHealthRecord 方法為例進行展開

    private void insertHealthRecord(HealthRecord healthRecord, Connection connection) throws SQLException {
    	try (PreparedStatement preparedStatement = connection.prepareStatement(sql_health_record_insert, Statement.RETURN_GENERATED_KEYS)) {
    		preparedStatement.setLong(1, healthRecord.getUserId());
            preparedStatement.setLong(2, healthRecord.getLevelId() % 5 );
            preparedStatement.setString(3, "Remark" + healthRecord.getUserId());
            preparedStatement.executeUpdate(); 
            
            try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                if (resultSet.next()) {
                	healthRecord.setRecordId(resultSet.getLong(1));
                }
            }
    	}
    	
    }

首先通過 Connection 對象構建一個 PreparedStatement。請注意,由於我們需要通過 ShardingSphere 的主鍵自動生成機制,所以在創建 PreparedStatement 時需要進行特殊地設置:

connection.prepareStatement(sql_health_record_insert, Statement.RETURN_GENERATED_KEYS)

通過這種方式,在 PreparedStatement 完成 SQL 執行之后,我們就可以獲取自動生成的主鍵值:

try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {

     if (resultSet.next()) {

          healthRecord.setRecordId(resultSet.getLong(1));

     }

}

當獲取這個主鍵值之后,就將這個主鍵值設置回 HealthRecord,這是使用自動生成主鍵的常見做法。

最后,我們在事務方法的入口處,需要設置 TransactionType:

	@Override
	public void processWithXA() throws SQLException {
		TransactionTypeHolder.set(TransactionType.XA);
		 
		insertHealthRecords();
	}

現在讓我們執行這個 processWithXA 方法,看看數據是否已經按照分庫的配置寫入到目標數據庫表中。下面是 ds0 中的 health_record 表和 health_task 表:

image-20201118194754773

image-20201118194815158

認為異常的時候,再次執行 processWithXA 方法,基於 connection 提供的 rollback 方法,我們發現已經執行的部分 SQL 並沒有提交到任何一個數據庫中。

使用 BASE 事務

相較於 XA 事務,在業務代碼中集成 BASE 事務的過程就顯得相對復雜一點,因為我們需要借助外部框架來做到這一點。這里,我們將基於阿里巴巴提供的 Seata 框架來演示如何使用 BASE 事務。

開發環境准備

同樣,要想使用基於 Seata 的 BASE 事務,我們首先需要在 pom 文件中添加對 sharding-jdbc-core 和 sharding-transaction-base-seata-at 這兩個依賴:

<dependency>

    <groupId>org.apache.shardingsphere</groupId>

    <artifactId>sharding-jdbc-core</artifactId>

</dependency>

 

<dependency>

     <groupId>org.apache.shardingsphere</groupId>

     <artifactId>sharding-transaction-base-seata-at</artifactId>

</dependency>

因為用到了 Seata 框架,所以也需要引入 Seate 框架的相關組件:

       <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-rm-datasource</artifactId>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-tm</artifactId>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-codec-all</artifactId>
        </dependency>

然后,我們下載並啟動 Seata 服務器,這個過程需要設置 Seata 服務器 config 目錄下的 registry.conf,以便指定注冊中心,這里使用 ZooKeeper 來充當注冊中心。關於如何啟動 Seata 服務器的過程可以參考 Seata 的官方文檔。請注意,按照 Seata 的運行要求,我們需要在每一個分片數據庫實例中創建一張 undo_log 表。然后,我們還需要在代碼工程中 classpath 中增加一個 seata.conf 配置文件:

client {
    application.id = health-base
    transaction.service.group = health-base-group
}

當然,這里我們還是繼續沿用前面介紹的分庫配置。

實現 BASE 事務

基於 ShardingSphere 提供的分布式事務的抽象,我們從 XA 事務轉到 BASE 事務唯一要做的事情就是重新設置 TransactionType,也就是修改一行代碼:

	@Override
	public void processWithBASE() throws SQLException {
		TransactionTypeHolder.set(TransactionType.BASE);
		 
		insertHealthRecords();
	}

下面這樣也是可以的

@Transactional
	@ShardingTransactionType(TransactionType.XA)  
	private List<Long> insertHealthRecords2() throws SQLException {
        List<Long> result = new ArrayList<>(10);       
        
        jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {        	
        	for (Long i = 1L; i <= 10; i++) {
        		HealthRecord healthRecord = createHealthRecord(i);   		   		
	    		insertHealthRecord(healthRecord, connection);
	    		
	    		HealthTask healthTask = createHealthTask(i, healthRecord);
	        	insertHealthTask(healthTask, connection);
	        	
	        	result.add(healthRecord.getRecordId());    	        	
            }
	    	
			return connection;
	    });

        return result;
    }	


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM