一、seata自動配置


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

上一篇文章中,展示了springboot如何引入並使用seata來實現分布式事務的,基本使用以后接下來將開始進行源代碼的閱讀。畢竟閱讀源代碼總是比閱讀文檔令人有興趣一點,而且了解他人的編碼思路似乎也算是一個跨時空的交流?

作為源碼篇的開篇, 將會閱讀springboot引入seata進行自動配置的部分。

 

自動配置類SeataAutoConfiguration

seata的自動配置類命名非常的直接,就叫做:SeataAutoConfiguration,我們打開這個類

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
    
}

首先,@Configuration表明,SeataAutoConfiguration被定義為了spring的配置類。

@ConditionalOnProperty將配置類生效條件設置為seata.enabled=true,默認值是true,所以可以開關分布式事務功能。

@EnableConfigurationProperties將配置包轉成了一個SeataProperties的Bean對象來使用。

@ComponentScan掃描了一下properties包,加載了一大堆類似SeataProperties的Bean對象。

 

接下來閱讀SeataAutoConfiguration的內部代碼

@Autowired
private SeataProperties seataProperties;

@Bean
public SpringUtils springUtils() {
    return new SpringUtils();
}

@Bean
@DependsOn({"springUtils"})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());
}

SpringUtils是一個實現了ApplicationContextAware的工具包,可以便捷地從容器當中getBean操作。

自動配置的核心點落在了下面的一個Bean,GlobalTransactionScanner。

我們看到構造這個Bean非常的簡單,構造方法只需要一個applicationId和txServiceGroup。

applicationId: 就是spring.application.name=你定義的當前應用的名字,例如:userService

txServiceGroup: 就是以applicationId 加上 -seata-service-group命名的,例如:userService-seata-service-group。如果版本較低的話,那時候可能還不叫seata而是fescar,因此默認命名就是以fescar為后綴。

new了一個GlobalTransactionScanner對象,SeataAutoConfiguration這個自動配置類的作用就結束了。有點草率?是的,不過不影響。畢竟SeataAutoConfiguration只是做了一個啟動引導的作用。

 

GlobalTransactionScanner主體邏輯

既然核心點落在GlobalTransactionScanner這個類,我們繼續關注它。看這個名字其實就可以猜測到一點它的作用,掃描@GlobalTransactional這個注解,並對代理方法進行攔截增強事務的功能。

要了解這個類,不得不先閱讀一下它的UML圖

可以看到,GlobalTransactionScanner主要有4個點值得關注:

1)Disposable接口,表達了spring容器銷毀的時候會進行一些操作

2)InitializingBean接口,表達了初始化的時候會進行一些操作

3)AbstractAutoProxyCreator表示它會對spring容器中的Bean進行切面增強,也就是我們上面的攔截事務增強的猜測。

4)ApplicationContextAware表示可以拿到spring容器

這里我們稍微關注一下這4個的執行順序:

ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean

 

我們重點關注一下InitializingBean和AbstractAutoProxyCreator的內容

InitializingBean

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        return;
    }
    initClient();
}

初始化Seata的Client端的東西,Client端主要包括TransactionManager和ResourceManager。或許是為了簡化吧,並沒有把initClient這件事從GlobalTransactionScanner里面獨立出來一個類。

跟進initClient方法

private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    
    //init TM
    TMClient.init(applicationId, txServiceGroup);
   
    //init RM
    RMClient.init(applicationId, txServiceGroup);
  
    registerSpringShutdownHook();
}

initClient邏輯並不復雜,單純調用TMClient.init初始化TransactionManager的RPC客戶端,RMClient.init初始化ResourceManager的RPC客戶端。seata的RPC采用netty來實現,seata封裝簡化了一下使用。

TMClient比較簡單,當初初始化RPC組件

public static void init(String applicationId, String transactionServiceGroup) {
    TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
    tmRpcClient.init();
}

我們關注一下RMClient的init方法

public static void init(String applicationId, String transactionServiceGroup) {
    // 獲取單例對象
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    // 設置ResourceManager的單例對象
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    // 添加監聽器,監聽Server端的消息推送
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    // 初始化RPC
    rmRpcClient.init();
}

和TMClient想比,RMClient多出了一個監聽Server端消息並處理的機制。也就是說TM的職責更多的是主動與Server端通信,比如:全局事務的begin、commit、rollback等。

而RM除了主動操作本地資源外,還會因為全局事務的commit、rollback等的消息推送,從而對本地資源進行相關操作。

 

AbstractAutoProxyCreator

GlobalTransactionScanner初始化完了TM和RM以后,我們再關注一下AbstractAutoProxyCreator,自動代理。

自動代理,它代理啥東西呢?或者說它給spring中的Bean增強了什么功能?

GlobalTransactionScanner主要擴展了AbstractAutoProxyCreator的兩個方法

1)wrapIfNecessary:代理增強的前置判斷處理,表示是否該Bean需要增強,如果增強的話創建代理類

2)postProcessAfterInitialization:當一個spring的Bean已經初始化完畢的時候,后置處理一些東西

wrapIfNecessary前置處理

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        synchronized (PROXYED_SET) {
            // 相同Bean排重
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }

            interceptor = null;
            // 判斷是否開啟TCC模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                // TCC實現的攔截器
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
            } else {
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 判斷是否存在@GlobalTransactional或者@GlobalLock注解
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                if (interceptor == null) {
                    // 非TCC的攔截器
                    interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);
                }
            }
            // 判斷當前Bean是否已經是spring的代理類了
            if (!AopUtils.isAopProxy(bean)) {
                // 如果還不是,那么走一輪spring的代理過程即可
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 如果是一個spring的代理類,那么反射獲取代理類中已經存在的攔截器集合,然后添加到該集合當中
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }

            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {}
}

wrapIfNecessary方法較長我們分步驟看看

1)isTccAutoProxy判斷是否開啟tcc模式,開啟的話選擇了TccActionInterceptor攔截器,非tcc模式選擇GlobalTransactionalInterceptor攔截器,默認不開啟

2)existAnnotation判斷當前Bean是否有類或者接口的方法存在@GlobalTransactional或者@GlobalLock注解,如果沒有則直接返回

3)isAopProxy方法是判斷當前的Bean是否已經是spring的代理類了,無論是JDK動態代理還是Cglib類代理。如果是普通的Bean,走原有的生成代理邏輯即可,如果已經是代理類,那么要通過反射獲取代理對象內的攔截器集合也叫做Advisor,直接添加到該集合當中。

wrapIfNecessary的方法並不復雜,但是如果對代理不是很熟悉或許對細節點會有些困惑。

 

postProcessAfterInitialization數據源代理

wrapIfNecessary創建了代理類,最后看看后置處理又做了啥。跟進該方法

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    // 判斷是否數據源,且不是數據源代理
    if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
        // 創建靜態代理
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
        Class<?>[] interfaces = SpringProxyUtils.getAllInterfaces(bean);
        // 創建動態代理類
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
                if (null != m) {
                    // 如果靜態代理存在目標對象的代理方法,那么調用該代理方法,從而調用目標方法
                    return m.invoke(dataSourceProxy, args);
                } else {
                    // 如果靜態代理不存在目標對象的代理方法,那么直接調用目標對象的代理方法
                    boolean oldAccessible = method.isAccessible();
                    try {
                        method.setAccessible(true);
                        return method.invoke(bean, args);
                    } finally {
                        //recover the original accessible for security reason
                        method.setAccessible(oldAccessible);
                    }
                }
            }
        });
    }
    // 不需要處理數據源代理的,按照原有邏輯處理
    return super.postProcessAfterInitialization(bean, beanName);
}

這里有兩大塊邏輯:

1)是數據源,且需要進行數據源代理的,那么特立獨行地走if內的邏輯

2)不需要數據源代理的,那么走原有邏輯

我們關注一下數據源自代理的邏輯,數據源代理或許是seata非常重要的實現之一

首先,DataSource是一個接口,DataSourceProxy對DataSource是一種實現的關系。但是,請注意!!!spring中的數據源的Bean和DataSource是實現關系,可是該Bean和DataSourceProxy並不是繼承或者實現的關系,而是組合關系。

因此,DataSourceProxy作為數據源Bean的靜態代理而存在,而它實現了DataSource的接口,但是很有可能並未實現Bean的一些接口。

這樣一來,就很有必要創建一個動態代理類,來關聯一下DataSourceProxy和Bean之間的關系了。如果DataSourceProxy代理過的方法,那么調用代理方法,如果沒有就直接調用Bean中的方法。

后置處理的大體邏輯就是這樣,基本上就是對數據源進行自動代理處理。

 

總結

本文主要是自動配置的主體邏輯,自動配置圍繞着GlobalTransactionScanner這個Bean展開。核心邏輯主要是三塊:

1)初始化TransactionManager和ResourceManager的RPC客戶端

2) 對@GlobalTransactional和@GlobalLock注解的方法進行增強

3)數據源代理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM