四、DataSourceProxy注冊為Resource


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

前面兩篇文章,主要了解了一下關於seata是怎么通過AOP給方法事務增強的,且關於TransactionalTemplate是如何執行事務的being -> commit || rollback的。本文將開始涉及關於數據源代理的部分。

在閱讀seata自動配置相關的GlobalTransactionScanner這個類的時候提了一下數據源代理的事情,數據源代理是seata實現分布式事務非常重要的點。並且,數據源代理類DataSourceProxy和DataSource是一種組合代理關系。

了解數據源代理,我們先從DataSourceProxy這個類開始

 

DataSourceProxy數據源代理

我們先看一下它的UML類圖

DataSourceProxy直接間接實現了DataSource,也就是可以直接被當作DataSource來使用。同時DataSourceProxy實現了Resource,將作為ResourceManager管理的資源對象。

 

AbstractDataSourceProxy

作為對DataSource接口直接實現的抽象類,我們看看它包含了哪些實現。

public abstract class AbstractDataSourceProxy implements DataSource {

    /**
     * 目標數據源
     */
    protected DataSource targetDataSource;

    /**
     * 構造方法傳入目標數據源
     */
    public AbstractDataSourceProxy(DataSource targetDataSource) {
        this.targetDataSource = targetDataSource;
    }

    /**
     * 獲取目標數據源
     */
    public DataSource getTargetDataSource() {
        return targetDataSource;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return targetDataSource.unwrap(iface);
    }

    // 省略部分 override方法
}

可以看到,AbstractDataSourceProxy定義了一個構造方法,要求傳入原始的數據源。

override的方法將直接調用原始數據源的方法。由此可見,AbstractDataSourceProxy並未實現事務代理相關的內容,需要由之類來實現。

 

DataSourceProxy

DataSourceProxy直接繼承自AbstractDataSourceProxy,我們直接打開它的構造方法看看。

public DataSourceProxy(DataSource targetDataSource) {
    this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}

跟進this

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
    super(targetDataSource);
    // 初始化
    init(targetDataSource, resourceGroupId);
}

在構造DataSourceProxy的過程中進行了初始化操作,跟進init方法

private void init(DataSource dataSource, String resourceGroupId) {
    this.resourceGroupId = resourceGroupId;
    // 獲取原始dataSource的Connection
    try (Connection connection = dataSource.getConnection()) {
        // 從Connection中獲取URL地址
        jdbcUrl = connection.getMetaData().getURL();
        // 從Connection中獲取數據庫類型
        dbType = JdbcUtils.getDbType(jdbcUrl, null);
    } catch (SQLException e) {
        throw new IllegalStateException("can not init dataSource", e);
    }
    // 注冊當前對象到ResourceManager
    DefaultResourceManager.get().registerResource(this); if (ENABLE_TABLE_META_CHECKER_ENABLE) {
        // 定時任務校驗表的元數據
        tableMetaExcutor.scheduleAtFixedRate(() -> {
            try (Connection connection = dataSource.getConnection()) {
                TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());
            } catch (Exception e) {
            }
        }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
    }
}

在init方法中主要是做了一件事情,將當前對象注冊到ResourceManager中。前面我們說過,DataSourceProxy這個對象實現了Resource,作為被ResourceManager直接管控的資源來使用。

這里作為Resource被注冊到ResourceManager中,我們跟進registerResource方法看看注冊過程

@Override
public void registerResource(Resource resource) {
    // 根據branchType選擇了ResourceManager,調用其注冊方法
    getResourceManager(resource.getBranchType()).registerResource(resource);
}

每個Resource都屬於一種branchType,branch叫做分支事務,屬於全局事務當中的一個節點。

DataSourceProxy的branchType是什么類型呢?打開getBranchType方法看看

@Override
public BranchType getBranchType() {
    return BranchType.AT;
}

是AT類型,也就是自動事務。那么getResourceManager將會獲取到什么ResourceManager?

protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();

private DefaultResourceManager() {
    initResourceManagers();
}

protected void initResourceManagers() {
    // SPI機制加載所有的ResourceManager
    List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
    if (CollectionUtils.isNotEmpty(allResourceManagers)) {
        for (ResourceManager rm : allResourceManagers) {
            resourceManagers.put(rm.getBranchType(), rm);
        }
    }
}   

public ResourceManager getResourceManager(BranchType branchType) {
    // 直接從集合中獲取
    ResourceManager rm = resourceManagers.get(branchType);
    return rm;
}

ResourceManager是采用了SPI機制加載實現類,每個ResourceManager對應一個branchType,通過branchType直接從hash集合中取出了。

那么branchType=AT對應的是哪個ResourceManager實現類呢?

是DataSourceManager

到這里,我們似乎看到了和GlobalTransaction相似的地方,GlobalTransaction把相關的操作委托給TransactionManager來做。而DataSourceProxy明顯把registerResource這件事委托給了ResourceManager來做。(后面的ConnectionProxy也是如此)

 

DataSourceManager.registerResource

我們跟進DataSourceManager的registerResource方法看看資源注冊

private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();

@Override
public void registerResource(Resource resource) {
    DataSourceProxy dataSourceProxy = (DataSourceProxy)resource;
    // 先加入本地緩存
    dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
    // 調用父級的注冊
    super.registerResource(dataSourceProxy);
}

DataSourceManager本地緩存了一份數據,而后調用父級注冊方法,跟進父級方法

@Override
public void registerResource(Resource resource) {
    RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

父級方法比較簡單,就是直接將當前Resource的groupId和resourceId通過RPC發送給Server端,從而注冊當前Resource。

 

總結

到這里,一個以DataSource作為Resource的資源注冊過程就結束了,其實相對簡單,就是在構造DataSourceProxy方法的時候,發送RPC到Server端,增加一份數據而已。

在Seata的GlobalTransactionScanner的postProcessAfterInitialization中,原始配置的DataSource將會被代理為DataSourceProxy數據源代理對象。

而DataSourceProxy構造過程中會調用init初始化方法,進行Resource的注冊。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM