所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
前面兩篇文章,主要了解了一下關於seata是怎么通過AOP給方法事務增強的,且關於TransactionalTemplate是如何執行事務的being -> commit || rollback的。本文將開始涉及關於數據源代理的部分。
在閱讀seata自動配置相關的GlobalTransactionScanner這個類的時候提了一下數據源代理的事情,數據源代理是seata實現分布式事務非常重要的點。並且,數據源代理類DataSourceProxy和DataSource是一種組合代理關系。
了解數據源代理,我們先從DataSourceProxy這個類開始
DataSourceProxy數據源代理
我們先看一下它的UML類圖

DataSourceProxy直接間接實現了DataSource,也就是可以直接被當作DataSource來使用。同時DataSourceProxy實現了Resource,將作為ResourceManager管理的資源對象。
AbstractDataSourceProxy
作為對DataSource接口直接實現的抽象類,我們看看它包含了哪些實現。
public abstract class AbstractDataSourceProxy implements DataSource { /** * 目標數據源 */ protected DataSource targetDataSource; /** * 構造方法傳入目標數據源 */ public AbstractDataSourceProxy(DataSource targetDataSource) { this.targetDataSource = targetDataSource; } /** * 獲取目標數據源 */ public DataSource getTargetDataSource() { return targetDataSource; } @Override public <T> T unwrap(Class<T> iface) throws SQLException { return targetDataSource.unwrap(iface); } // 省略部分 override方法 }
可以看到,AbstractDataSourceProxy定義了一個構造方法,要求傳入原始的數據源。
override的方法將直接調用原始數據源的方法。由此可見,AbstractDataSourceProxy並未實現事務代理相關的內容,需要由之類來實現。
DataSourceProxy
DataSourceProxy直接繼承自AbstractDataSourceProxy,我們直接打開它的構造方法看看。
public DataSourceProxy(DataSource targetDataSource) { this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID); }
跟進this
public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) { super(targetDataSource); // 初始化 init(targetDataSource, resourceGroupId); }
在構造DataSourceProxy的過程中進行了初始化操作,跟進init方法
private void init(DataSource dataSource, String resourceGroupId) { this.resourceGroupId = resourceGroupId; // 獲取原始dataSource的Connection try (Connection connection = dataSource.getConnection()) { // 從Connection中獲取URL地址 jdbcUrl = connection.getMetaData().getURL(); // 從Connection中獲取數據庫類型 dbType = JdbcUtils.getDbType(jdbcUrl, null); } catch (SQLException e) { throw new IllegalStateException("can not init dataSource", e); } // 注冊當前對象到ResourceManager DefaultResourceManager.get().registerResource(this); if (ENABLE_TABLE_META_CHECKER_ENABLE) { // 定時任務校驗表的元數據 tableMetaExcutor.scheduleAtFixedRate(() -> { try (Connection connection = dataSource.getConnection()) { TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId()); } catch (Exception e) { } }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS); } }
在init方法中主要是做了一件事情,將當前對象注冊到ResourceManager中。前面我們說過,DataSourceProxy這個對象實現了Resource,作為被ResourceManager直接管控的資源來使用。
這里作為Resource被注冊到ResourceManager中,我們跟進registerResource方法看看注冊過程
@Override public void registerResource(Resource resource) { // 根據branchType選擇了ResourceManager,調用其注冊方法 getResourceManager(resource.getBranchType()).registerResource(resource); }
每個Resource都屬於一種branchType,branch叫做分支事務,屬於全局事務當中的一個節點。
DataSourceProxy的branchType是什么類型呢?打開getBranchType方法看看
@Override public BranchType getBranchType() { return BranchType.AT; }
是AT類型,也就是自動事務。那么getResourceManager將會獲取到什么ResourceManager?
protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>(); private DefaultResourceManager() { initResourceManagers(); } protected void initResourceManagers() { // SPI機制加載所有的ResourceManager List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class); if (CollectionUtils.isNotEmpty(allResourceManagers)) { for (ResourceManager rm : allResourceManagers) { resourceManagers.put(rm.getBranchType(), rm); } } } public ResourceManager getResourceManager(BranchType branchType) { // 直接從集合中獲取 ResourceManager rm = resourceManagers.get(branchType); return rm; }
ResourceManager是采用了SPI機制加載實現類,每個ResourceManager對應一個branchType,通過branchType直接從hash集合中取出了。
那么branchType=AT對應的是哪個ResourceManager實現類呢?
是DataSourceManager
到這里,我們似乎看到了和GlobalTransaction相似的地方,GlobalTransaction把相關的操作委托給TransactionManager來做。而DataSourceProxy明顯把registerResource這件事委托給了ResourceManager來做。(后面的ConnectionProxy也是如此)
DataSourceManager.registerResource
我們跟進DataSourceManager的registerResource方法看看資源注冊
private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>(); @Override public void registerResource(Resource resource) { DataSourceProxy dataSourceProxy = (DataSourceProxy)resource; // 先加入本地緩存 dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy); // 調用父級的注冊 super.registerResource(dataSourceProxy); }
DataSourceManager本地緩存了一份數據,而后調用父級注冊方法,跟進父級方法
@Override public void registerResource(Resource resource) { RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId()); }
父級方法比較簡單,就是直接將當前Resource的groupId和resourceId通過RPC發送給Server端,從而注冊當前Resource。
總結
到這里,一個以DataSource作為Resource的資源注冊過程就結束了,其實相對簡單,就是在構造DataSourceProxy方法的時候,發送RPC到Server端,增加一份數據而已。
在Seata的GlobalTransactionScanner的postProcessAfterInitialization中,原始配置的DataSource將會被代理為DataSourceProxy數據源代理對象。
而DataSourceProxy構造過程中會調用init初始化方法,進行Resource的注冊。
