先推薦一下碼雲上的一個GVP(最有價值的開源項目) AgileBPM(下面簡稱ab),我下面講解的方案也是它的Bo支持多數據源操作事務管理器,友情鏈接:http://doc.agilebpm.cn/
目前是解決的是處理單系統內的多數據源問題,簡單來說就是在單系統中的一個線程內,保護多個數據源事務,這也是ab項目所需要的場景。
參考了碼雲上的開源的lcn分布式事務解決方案,覺得再拓展一下也是可以解決微服務間的分布式事務處理,利用redis放一個事務處理的共同空間,然后在共同空間內來統籌事務,不過它處理commit異常的問題也是用通用方式(commit失敗很多項目都是采取tcc的方式處理)。
ps:之前本人試過使用jta事務管理器,這個性能真看不下去。一會就卡。。所以就想着自己定義個管理器,自己來釋放資源。
1 用AbstractRoutingDataSource讓系統支持多數據源
動態數據源配置:
真正的數據源(druid數據源):
展示一下DynamicDataSource是繼承了AbstractRoutingDataSource的實現,這里不是重點。
2 實現支持這種路由數據源的事務管理器
先繼承AbstractPlatformTransactionManager(事務管理器的抽象類,我們很常用的DataSourceTransactionManager就是繼承它的)
里面需要實現幾個關鍵點就行(筆者只考慮了事務傳播性為PROPAGATION_REQUIRED的情況,這也是項目最常用的,其他我沒支持,畢竟是定制化的事務管理器)
package com.dstz.bus.service.impl; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import javax.sql.DataSource; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.ConnectionHolder; import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.transaction.support.ResourceTransactionManager; import org.springframework.transaction.support.TransactionSynchronizationManager; import com.dstz.base.core.util.AppUtil; import com.dstz.base.core.util.ThreadMapUtil; import com.dstz.base.db.datasource.DataSourceUtil; import com.dstz.base.db.datasource.DbContextHolder; import com.dstz.base.db.datasource.DynamicDataSource; /** * <pre> * 描述:ab 結合sys多數據源操作 專門為bo db實例化做的事務管理器 * 它只保護系統數據源(包含dataSourceDefault),不會保護datasource * 其實可以做到,但是這個事務管理器目前只為bo多數據源的保護,所以我沒支持 * 作者:aschs * 郵箱:aschs@qq.com * 日期:2018年10月10日 * 版權:summer * </pre> */ public class AbDataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { private int i = 0; @Override public void afterPropertiesSet() throws Exception { logger.debug("ab的事務管理器已就緒"); } @Override public Object getResourceFactory() { return DataSourceUtil.getDataSourceByAlias(DataSourceUtil.GLOBAL_DATASOURCE); } /** * <pre> * 生成一個在整個事務處理都用到的資源 * 這里我放了在過程中的所有連接 Map<數據源別名,連接> * </pre> */ @Override protected Object doGetTransaction() { return new HashMap<String, Connection>(); } /** * 判斷是否已存在事務 */ @Override protected boolean isExistingTransaction(Object transaction) { return (boolean) ThreadMapUtil.getOrDefault("abTransactionManagerExist", false); } /** * <pre> * 必須實現的一個方法,設置線程內的事務為回滾狀態。 * 這里其實是為了預防傳播性設置為 讓線程內可以多次管理器操作的情況下,用來通知大家不要只做回滾,別commit了。 * 在該事務管理器只支持PROPAGATION_REQUIRED 的情況下(線程只有一個管理器操作),沒多大用,只是必須要實現這個 * 不然抽象類那里會有報錯代碼。 * </pre> */ @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) { ThreadMapUtil.put("abTransactionManagerRollbackOnly", true);//標記ab事務管理器在線程內已准備要回滾了 } /** * <pre> * 准備事務,獲取鏈接 * </pre> */ @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { logger.info("分布式事務開始:"+i); Map<String, Connection> conMap = (Map<String, Connection>) transaction; Map<String, DataSource> dsMap = DataSourceUtil.getDataSources(); // 遍歷系統中的所有數據源,打開連接 for (Entry<String, DataSource> entry : dsMap.entrySet()) { Connection con = null; try { ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(entry.getValue()); if (conHolder == null) { con = entry.getValue().getConnection(); con.setAutoCommit(false); // 緩存鏈接 TransactionSynchronizationManager.bindResource(entry.getValue(), new ConnectionHolder(con)); } else { con = conHolder.getConnection(); } //系統數據源放進資源里 if(DbContextHolder.getDataSource().equals(entry.getKey())) { DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE); TransactionSynchronizationManager.bindResource(dynamicDataSource, new ConnectionHolder(con)); } conMap.put(entry.getKey(), con); logger.debug("數據源別名[" + entry.getKey() + "]打開連接成功"); } catch (Throwable ex) { doCleanupAfterCompletion(conMap); throw new CannotCreateTransactionException("數據源別名[" + entry.getKey() + "]打開連接錯誤", ex); } } ThreadMapUtil.put("abTransactionManagerExist", true);//標記ab事務管理器已經在線程內啟動了 } @Override protected void doCommit(DefaultTransactionStatus status) { Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction(); for (Entry<String, Connection> entry : conMap.entrySet()) { try { entry.getValue().commit(); logger.debug("數據源別名[" + entry.getKey() + "]提交事務成功"); } catch (SQLException ex) { doCleanupAfterCompletion(conMap); throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]提交事務失敗", ex); } } logger.info("分布式事務提交:"+i); } /** * 回滾 */ @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException { Map<String, Connection> conMap = (Map<String, Connection>) status.getTransaction(); for (Entry<String, Connection> entry : conMap.entrySet()) { try { entry.getValue().rollback(); logger.debug("數據源別名[" + entry.getKey() + "]回滾事務成功"); } catch (SQLException ex) { doCleanupAfterCompletion(conMap); throw new TransactionSystemException("數據源別名[" + entry.getKey() + "]回滾事務失敗", ex); } } logger.info("分布式事務回滾:"+i); } /** * 回收鏈接 */ @Override protected void doCleanupAfterCompletion(Object transaction) { Map<String, Connection> conMap = (Map<String, Connection>) transaction; for (Entry<String, Connection> entry : conMap.entrySet()) { DataSource dataSource = DataSourceUtil.getDataSourceByAlias(entry.getKey()); TransactionSynchronizationManager.unbindResource(dataSource); DataSourceUtils.releaseConnection(entry.getValue(), dataSource); logger.debug("數據源別名[" + entry.getKey() + "]關閉鏈接成功"); } //最后把本地資源也釋放了 DynamicDataSource dynamicDataSource = (DynamicDataSource) AppUtil.getBean(DataSourceUtil.GLOBAL_DATASOURCE); TransactionSynchronizationManager.unbindResource(dynamicDataSource); ThreadMapUtil.remove("abTransactionManagerExist"); ThreadMapUtil.remove("abTransactionManagerRollbackOnly"); ThreadMapUtil.remove(); logger.info("分布式事務釋放:"+(i++)); } }
事務管理器的方法調用順序和時機大概說一下:
1 doGetTransaction方法:來初始化事務處理過程中的公共資源,后面調用的其他方法都是以它為媒介的。
2 doBegin方法:開始事務操作,主要是打開數據源的鏈接,記得要放到事務資源管理服務中TransactionSynchronizationManager,非常重要,因為這個過程中用到的jdbc操作是從這里面拿的。
3 doCommit(doRollback):如題,把獲取的鏈接提交或者回滾操作。
4 doCleanupAfterCompletion:回收鏈接資源
至此,事務管理器的邏輯已經結束了~
最后,實現務必實現isExistingTransaction,用來處理重復線程內多次觸發了事務切面的邏輯
這里筆者用簡單的線程變量來標記是否線程內已存在了事務管理,因為我只支持PROPAGATION_REQUIRED傳播性,所以沒考慮內部嵌入的其他情況,其實也是內部commit一下,資源肯定是最后統一釋放的。
3 使用自定義事務管理器
先提一下,這里筆者只保護會使用到多數據源的模塊,其實大部分系統邏輯還是用DataSourceTransactionManager就夠,不需要保護太多數據源(因為釋放和打開鏈接是有性能損耗的)。
可以看出,主要的邏輯系統還是使用傳統管理器,然后在特定地方聲明特殊管理器則可:
5 到這里,整個分布式事務管理已完成了,主要是利用了路由數據源AbstractRoutingDataSource和自定義事務管理器實現的~
6 AgileBPM的多數據源結合展示(可跳過)
這里展示一下,這個開源的流程系統的強大可配置性(讓人發指的靈活性-。-)的數據源管理功能。
數據源模板,在這里你可以使用定義不同的數據源實現類,只要在項目import就行
這里有一個內置的阿里的數據源,后面有需要你可以增加其他模板,例如BasicDataSOurce這個,最常用的數據源。
有了模板,就可以新建數據源了:
這里的是特殊默認數據源,系統本地數據源,用戶可以隨便添加。
然后,我就基於AgileBPM的強大數據源管理下,進行了分布式數據源測試。測試邏輯很簡單,就是在一個線程內,操作多個數據源,然后看一下會不會一起回滾。
這里展示了一下,AgileBPM中使用數據源的便捷性,根據配置的別名,直接拿來代碼開發則可,測試代碼比較隨意了,能保證一致性。
7 這樣的實現在壓測中的表現
本人用的是jmetter來壓測事務處理,它的表現跟傳統的DataSourceTransactionManager表現是一樣的!!!!(雖然過程遇到了線程變量的坑,但已修復)。
配置,400進程同時施壓:
這是壓測結果
這是日志輸出,我故意輸出了每一次獲取鏈接,提交,和釋放的事務處理過程
8 挫敗:原來行業內的問題主要卡在commit上。
由於對分布式事務產生極大興趣,所以專研了一下,這么簡單的實現為啥別人都覺得是打問題呢?原來是因為commit會出錯的情況,第一個鏈接commit成功后,第二個鏈接commit失敗,那么第一個鏈接已不能回滾了!!!!所以行業內大部分方案都在處理這種情況,雖然到了commit階段,數據庫已經對相關資源產生了寫鎖,數據也寫入磁盤,就等commit刷進去了,產生錯誤的概率是極少了。作為行業內的大難題,很多方案在處理這個問題。什么2pc原則。。等等,有空我整理一下。大部分主流項目解決方案還是tcc為主,畢竟這個最通用直接。
8.1頓悟!!
頓悟!!其實我以上這種實現方案就是2pc的實現方案,在jdbc都操作了sql沒問題后,再一並提交的方案就是2pc。但是2pc有這個commit提交存在的設計缺陷(這種時機是存在很少可能性的),所以別人就提出tcc和消息隊列的解決commit異常的更可靠的方案(但是,只要是串行邏輯就沒有百分百可靠的方案,只是降低了可能性罷了)。所以,ab項目關於分布式是采用了2pc的解決方案,順帶提一下jta事務也是類似的邏輯,不過他們的性能主要卡在消息通知上。例如所有鏈接操作sql都成功了,我需要通知AB鏈接去提交,我通知了A,A提交成功,然后我通知B,B沒收到消息,那么AB資源都會卡住不釋放,然后B會超時導致回滾了。所以,jta在消息通知上比較損耗性能……關於2pc的友情鏈接:
落寞的流月城(632266504) 14:13:42
https://cloud.tencent.com/developer/article/1355859
9關於AB項目的多數據bo場景方案
在經歷挫敗之后,理性分析了一下,其實當前這種方案已經滿足了ab的分布式事務處理的需求了。首先,其實commit失敗的場景是少之又少,筆者調整了邏輯,后面把重要的系統數據源放在最后提交,保證了系統數據源的強一致性,也就是說保證了流程數據的一致性。
原因分析,這里細想一個場景,我先把業務數據從1改成2,然后驅動流程流轉,假如我的bo是其他數據源A,A先提交成功,但是系統的本地數據源B提交失敗了,那么導致B操作的流程數據會回滾,但是A的數據已提交無法回滾。結果是,流程沒有流轉,但是業務數據已更新了為2了。這種場景在ab中,相當於,我操作了一下業務數據的保存,因為流程沒有變,只是保存了一下數據,對於流程系統本身來說,有時候還是好事,因為雖然流程流轉失敗了,但是業務數據不想再填寫一次。所以我說這種方案已經滿足ab項目的多數據源下的分布式場景的需求了。
當然,如果用戶還是執着於所有數據源的強一致性,在ab項目中可以在bo保存前,先備份一下bo數據,然后在doCommit時恢復備份數據則可以,ab里有很多時機插件,定義了一些時機插件列表,然后你多實現了插件則會運行,ab的插件代碼展示,如下:
ab作為面向技術人員的流程系統,里面內嵌提供了豐富的便捷開發的寫法和實現。