單系統下的分布式數據庫事務方案(拓展spring的事務管理器)AgileBPM多數據的解決方案


先推薦一下碼雲上的一個GVP(最有價值的開源項目) AgileBPM(下面簡稱ab),我下面講解的方案也是它的Bo支持多數據源操作事務管理器,友情鏈接:http://doc.agilebpm.cn/

目前是解決的是處理單系統內的多數據源問題,簡單來說就是在單系統中的一個線程內,保護多個數據源事務,這也是ab項目所需要的場景。

參考了碼雲上的開源的lcn分布式事務解決方案,覺得再拓展一下也是可以解決微服務間的分布式事務處理,利用redis放一個事務處理的共同空間,然后在共同空間內來統籌事務,不過它處理commit異常的問題也是用通用方式(commit失敗很多項目都是采取tcc的方式處理)。

ps:之前本人試過使用jta事務管理器,這個性能真看不下去。一會就卡。。所以就想着自己定義個管理器,自己來釋放資源。

1 用AbstractRoutingDataSource讓系統支持多數據源

動態數據源配置:

 

真正的數據源(druid數據源):

展示一下DynamicDataSource是繼承了AbstractRoutingDataSource的實現,這里不是重點。

2 實現支持這種路由數據源的事務管理器

先繼承AbstractPlatformTransactionManager(事務管理器的抽象類,我們很常用的DataSourceTransactionManager就是繼承它的)

里面需要實現幾個關鍵點就行(筆者只考慮了事務傳播性為PROPAGATION_REQUIRED的情況,這也是項目最常用的,其他我沒支持,畢竟是定制化的事務管理器)

package com.dstz.bus.service.impl;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.ConnectionHolder;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.ResourceTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.dstz.base.core.util.AppUtil;
import com.dstz.base.core.util.ThreadMapUtil;
import com.dstz.base.db.datasource.DataSourceUtil;
import com.dstz.base.db.datasource.DbContextHolder;
import com.dstz.base.db.datasource.DynamicDataSource;

/**
 * <pre>
 * 描述:ab 結合sys多數據源操作 專門為bo db實例化做的事務管理器
 * 它只保護系統數據源(包含dataSourceDefault),不會保護datasource
 * 其實可以做到,但是這個事務管理器目前只為bo多數據源的保護,所以我沒支持
 * 作者:aschs
 * 郵箱:aschs@qq.com
 * 日期:2018年10月10日
 * 版權:summer
 * </pre>
 */
public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    private int i = 0;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        logger.debug("ab的事務管理器已就緒");
    }

    @Override
    public Object getResourceFactory() {
        return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE);
    }

    /**
     * <pre>
     * 生成一個在整個事務處理都用到的資源
     * 這里我放了在過程中的所有連接 Map<數據源別名,連接>
     * </pre>
     */
    @Override
    protected Object doGetTransaction() {
        return new HashMap<String, Connection>();
    }
    
    /**
     * 判斷是否已存在事務
     */
    @Override
    protected boolean isExistingTransaction(Object transaction) {
        return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false);
    }
    
    /**
     * <pre>
     * 必須實現的一個方法,設置線程內的事務為回滾狀態。
     * 這里其實是為了預防傳播性設置為 讓線程內可以多次管理器操作的情況下,用來通知大家不要只做回滾,別commit了。
     * 在該事務管理器只支持PROPAGATION_REQUIRED 的情況下(線程只有一個管理器操作),沒多大用,只是必須要實現這個
     * 不然抽象類那里會有報錯代碼。
     * </pre>
     */
    @Override
    protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//標記ab事務管理器在線程內已准備要回滾了
    }
    
    /**
     * <pre>
     * 准備事務,獲取鏈接
     * </pre>
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
        logger.info("分布式事務開始:"+i);
        
        Map<String, Connection> conMap = (Map<String, Connection>) transaction;
        Map<String, DataSource> dsMap = DataSourceUtil.getDataSources();
        // 遍歷系統中的所有數據源,打開連接
        for (Entry<String, DataSource> entry : dsMap.entrySet()) {
            Connection con = null;
            try {
                ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue());
                if (conHolder == null) {
                    con = entry.getValue().getConnection();
                    con.setAutoCommit(false);
                    // 緩存鏈接
                    TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con));
                } else {
                    con = conHolder.getConnection();
                }
                
                //系統數據源放進資源里
                if(DbContextHolder.getDataSource().equals(entry.getKey())) {
                    DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
                    TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con));
                }
                
                conMap.put(entry.getKey(), con);
                logger.debug("數據源別名[" + entry.getKey() + "]打開連接成功");
            } catch (Throwable ex) {
                doCleanupAfterCompletion(conMap);
                throw new CannotCreateTransactionException("數據源別名[" + entry.getKey() + "]打開連接錯誤", ex);
            }
        }
        
        ThreadMapUtil.put("abTransactionManagerExist", true);//標記ab事務管理器已經在線程內啟動了
    }

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            try {
                entry.getValue().commit();
                logger.debug("數據源別名[" + entry.getKey() + "]提交事務成功");
            } catch (SQLException ex) {
                doCleanupAfterCompletion(conMap);
                throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]提交事務失敗", ex);
            }
        }
        logger.info("分布式事務提交:"+i);
    }
    
    /**
     * 回滾
     */
    @Override
    protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
        Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction();
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            try {
                entry.getValue().rollback();
                logger.debug("數據源別名[" + entry.getKey() + "]回滾事務成功");
            } catch (SQLException ex) {
                doCleanupAfterCompletion(conMap);
                throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]回滾事務失敗", ex);
            }
        }
        logger.info("分布式事務回滾:"+i);
    }
    
    /**
     * 回收鏈接
     */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        Map<String, Connection> conMap = (Map<String, Connection>) transaction;
        for (Entry<String, Connection> entry : conMap.entrySet()) {
            DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey());
            TransactionSynchronizationManager.unbindResource(dataSource);
            DataSourceUtils.releaseConnection(entry.getValue(), dataSource);
            logger.debug("數據源別名[" + entry.getKey() + "]關閉鏈接成功");
        }
        
        //最后把本地資源也釋放了
        DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE);
        TransactionSynchronizationManager.unbindResource(dynamicDataSource);
        
        ThreadMapUtil.remove("abTransactionManagerExist");
        ThreadMapUtil.remove("abTransactionManagerRollbackOnly");
        ThreadMapUtil.remove();
        
        logger.info("分布式事務釋放:"+(i++));
    }
}

 

 事務管理器的方法調用順序和時機大概說一下:

1 doGetTransaction方法:來初始化事務處理過程中的公共資源,后面調用的其他方法都是以它為媒介的。

2 doBegin方法:開始事務操作,主要是打開數據源的鏈接,記得要放到事務資源管理服務中TransactionSynchronizationManager,非常重要,因為這個過程中用到的jdbc操作是從這里面拿的。

3 doCommit(doRollback):如題,把獲取的鏈接提交或者回滾操作。

4 doCleanupAfterCompletion:回收鏈接資源

至此,事務管理器的邏輯已經結束了~

最后,實現務必實現isExistingTransaction,用來處理重復線程內多次觸發了事務切面的邏輯

這里筆者用簡單的線程變量來標記是否線程內已存在了事務管理,因為我只支持PROPAGATION_REQUIRED傳播性,所以沒考慮內部嵌入的其他情況,其實也是內部commit一下,資源肯定是最后統一釋放的。

3 使用自定義事務管理器

先提一下,這里筆者只保護會使用到多數據源的模塊,其實大部分系統邏輯還是用DataSourceTransactionManager就夠,不需要保護太多數據源(因為釋放和打開鏈接是有性能損耗的)。

可以看出,主要的邏輯系統還是使用傳統管理器,然后在特定地方聲明特殊管理器則可:

5 到這里,整個分布式事務管理已完成了,主要是利用了路由數據源AbstractRoutingDataSource和自定義事務管理器實現的~

6 AgileBPM的多數據源結合展示(可跳過)

這里展示一下,這個開源的流程系統的強大可配置性(讓人發指的靈活性-。-)的數據源管理功能。

數據源模板,在這里你可以使用定義不同的數據源實現類,只要在項目import就行

這里有一個內置的阿里的數據源,后面有需要你可以增加其他模板,例如BasicDataSOurce這個,最常用的數據源。

有了模板,就可以新建數據源了:

這里的是特殊默認數據源,系統本地數據源,用戶可以隨便添加。

然后,我就基於AgileBPM的強大數據源管理下,進行了分布式數據源測試。測試邏輯很簡單,就是在一個線程內,操作多個數據源,然后看一下會不會一起回滾。

 

 這里展示了一下,AgileBPM中使用數據源的便捷性,根據配置的別名,直接拿來代碼開發則可,測試代碼比較隨意了,能保證一致性。

 7 這樣的實現在壓測中的表現

本人用的是jmetter來壓測事務處理,它的表現跟傳統的DataSourceTransactionManager表現是一樣的!!!!(雖然過程遇到了線程變量的坑,但已修復)。

配置,400進程同時施壓:

這是壓測結果

 這是日志輸出,我故意輸出了每一次獲取鏈接,提交,和釋放的事務處理過程

 8 挫敗:原來行業內的問題主要卡在commit上。

由於對分布式事務產生極大興趣,所以專研了一下,這么簡單的實現為啥別人都覺得是打問題呢?原來是因為commit會出錯的情況,第一個鏈接commit成功后,第二個鏈接commit失敗,那么第一個鏈接已不能回滾了!!!!所以行業內大部分方案都在處理這種情況,雖然到了commit階段,數據庫已經對相關資源產生了寫鎖,數據也寫入磁盤,就等commit刷進去了,產生錯誤的概率是極少了。作為行業內的大難題,很多方案在處理這個問題。什么2pc原則。。等等,有空我整理一下。大部分主流項目解決方案還是tcc為主,畢竟這個最通用直接。

8.1頓悟!!

頓悟!!其實我以上這種實現方案就是2pc的實現方案,在jdbc都操作了sql沒問題后,再一並提交的方案就是2pc。但是2pc有這個commit提交存在的設計缺陷(這種時機是存在很少可能性的),所以別人就提出tcc和消息隊列的解決commit異常的更可靠的方案(但是,只要是串行邏輯就沒有百分百可靠的方案,只是降低了可能性罷了)。所以,ab項目關於分布式是采用了2pc的解決方案,順帶提一下jta事務也是類似的邏輯,不過他們的性能主要卡在消息通知上。例如所有鏈接操作sql都成功了,我需要通知AB鏈接去提交,我通知了A,A提交成功,然后我通知B,B沒收到消息,那么AB資源都會卡住不釋放,然后B會超時導致回滾了。所以,jta在消息通知上比較損耗性能……關於2pc的友情鏈接:

落寞的流月城(632266504) 14:13:42
https://cloud.tencent.com/developer/article/1355859

 

 

9關於AB項目的多數據bo場景方案

在經歷挫敗之后,理性分析了一下,其實當前這種方案已經滿足了ab的分布式事務處理的需求了。首先,其實commit失敗的場景是少之又少,筆者調整了邏輯,后面把重要的系統數據源放在最后提交,保證了系統數據源的強一致性,也就是說保證了流程數據的一致性。

 

原因分析,這里細想一個場景,我先把業務數據從1改成2,然后驅動流程流轉,假如我的bo是其他數據源A,A先提交成功,但是系統的本地數據源B提交失敗了,那么導致B操作的流程數據會回滾,但是A的數據已提交無法回滾。結果是,流程沒有流轉,但是業務數據已更新了為2了。這種場景在ab中,相當於,我操作了一下業務數據的保存,因為流程沒有變,只是保存了一下數據,對於流程系統本身來說,有時候還是好事,因為雖然流程流轉失敗了,但是業務數據不想再填寫一次。所以我說這種方案已經滿足ab項目的多數據源下的分布式場景的需求了。

當然,如果用戶還是執着於所有數據源的強一致性,在ab項目中可以在bo保存前,先備份一下bo數據,然后在doCommit時恢復備份數據則可以,ab里有很多時機插件,定義了一些時機插件列表,然后你多實現了插件則會運行,ab的插件代碼展示,如下:

ab作為面向技術人員的流程系統,里面內嵌提供了豐富的便捷開發的寫法和實現。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM