所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
在上一篇文章中,展示了springboot如何引入並使用seata來實現分布式事務的,基本使用以后接下來將開始進行源代碼的閱讀。畢竟閱讀源代碼總是比閱讀文檔令人有興趣一點,而且了解他人的編碼思路似乎也算是一個跨時空的交流?
作為源碼篇的開篇, 將會閱讀springboot引入seata進行自動配置的部分。
自動配置類SeataAutoConfiguration
seata的自動配置類命名非常的直接,就叫做:SeataAutoConfiguration,我們打開這個類
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties") @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) @Configuration @EnableConfigurationProperties({SeataProperties.class}) public class SeataAutoConfiguration { }
首先,@Configuration表明,SeataAutoConfiguration被定義為了spring的配置類。
@ConditionalOnProperty將配置類生效條件設置為seata.enabled=true,默認值是true,所以可以開關分布式事務功能。
@EnableConfigurationProperties將配置包轉成了一個SeataProperties的Bean對象來使用。
@ComponentScan掃描了一下properties包,加載了一大堆類似SeataProperties的Bean對象。
接下來閱讀SeataAutoConfiguration的內部代碼
@Autowired private SeataProperties seataProperties; @Bean public SpringUtils springUtils() { return new SpringUtils(); } @Bean @DependsOn({"springUtils"}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup()); }
SpringUtils是一個實現了ApplicationContextAware的工具包,可以便捷地從容器當中getBean操作。
自動配置的核心點落在了下面的一個Bean,GlobalTransactionScanner。
我們看到構造這個Bean非常的簡單,構造方法只需要一個applicationId和txServiceGroup。
applicationId: 就是spring.application.name=你定義的當前應用的名字,例如:userService
txServiceGroup: 就是以applicationId 加上 -seata-service-group命名的,例如:userService-seata-service-group。如果版本較低的話,那時候可能還不叫seata而是fescar,因此默認命名就是以fescar為后綴。
new了一個GlobalTransactionScanner對象,SeataAutoConfiguration這個自動配置類的作用就結束了。有點草率?是的,不過不影響。畢竟SeataAutoConfiguration只是做了一個啟動引導的作用。
GlobalTransactionScanner主體邏輯
既然核心點落在GlobalTransactionScanner這個類,我們繼續關注它。看這個名字其實就可以猜測到一點它的作用,掃描@GlobalTransactional這個注解,並對代理方法進行攔截增強事務的功能。
要了解這個類,不得不先閱讀一下它的UML圖
可以看到,GlobalTransactionScanner主要有4個點值得關注:
1)Disposable接口,表達了spring容器銷毀的時候會進行一些操作
2)InitializingBean接口,表達了初始化的時候會進行一些操作
3)AbstractAutoProxyCreator表示它會對spring容器中的Bean進行切面增強,也就是我們上面的攔截事務增強的猜測。
4)ApplicationContextAware表示可以拿到spring容器
這里我們稍微關注一下這4個的執行順序:
ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean
我們重點關注一下InitializingBean和AbstractAutoProxyCreator的內容
InitializingBean
@Override public void afterPropertiesSet() { if (disableGlobalTransaction) { return; } initClient(); }
初始化Seata的Client端的東西,Client端主要包括TransactionManager和ResourceManager。或許是為了簡化吧,並沒有把initClient這件事從GlobalTransactionScanner里面獨立出來一個類。
跟進initClient方法
private void initClient() { if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException( "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup); } //init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup); registerSpringShutdownHook(); }
initClient邏輯並不復雜,單純調用TMClient.init初始化TransactionManager的RPC客戶端,RMClient.init初始化ResourceManager的RPC客戶端。seata的RPC采用netty來實現,seata封裝簡化了一下使用。
TMClient比較簡單,當初初始化RPC組件
public static void init(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup); tmRpcClient.init(); }
我們關注一下RMClient的init方法
public static void init(String applicationId, String transactionServiceGroup) { // 獲取單例對象 RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); // 設置ResourceManager的單例對象 rmRpcClient.setResourceManager(DefaultResourceManager.get()); // 添加監聽器,監聽Server端的消息推送 rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); // 初始化RPC rmRpcClient.init(); }
和TMClient想比,RMClient多出了一個監聽Server端消息並處理的機制。也就是說TM的職責更多的是主動與Server端通信,比如:全局事務的begin、commit、rollback等。
而RM除了主動操作本地資源外,還會因為全局事務的commit、rollback等的消息推送,從而對本地資源進行相關操作。
AbstractAutoProxyCreator
GlobalTransactionScanner初始化完了TM和RM以后,我們再關注一下AbstractAutoProxyCreator,自動代理。
自動代理,它代理啥東西呢?或者說它給spring中的Bean增強了什么功能?
GlobalTransactionScanner主要擴展了AbstractAutoProxyCreator的兩個方法
1)wrapIfNecessary:代理增強的前置判斷處理,表示是否該Bean需要增強,如果增強的話創建代理類
2)postProcessAfterInitialization:當一個spring的Bean已經初始化完畢的時候,后置處理一些東西
wrapIfNecessary前置處理
@Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { try { synchronized (PROXYED_SET) { // 相同Bean排重 if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; // 判斷是否開啟TCC模式 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // TCC實現的攔截器 interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // 判斷是否存在@GlobalTransactional或者@GlobalLock注解 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (interceptor == null) { // 非TCC的攔截器 interceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor); } } // 判斷當前Bean是否已經是spring的代理類了 if (!AopUtils.isAopProxy(bean)) { // 如果還不是,那么走一輪spring的代理過程即可 bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { // 如果是一個spring的代理類,那么反射獲取代理類中已經存在的攔截器集合,然后添加到該集合當中 AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) {} }
wrapIfNecessary方法較長我們分步驟看看
1)isTccAutoProxy判斷是否開啟tcc模式,開啟的話選擇了TccActionInterceptor攔截器,非tcc模式選擇GlobalTransactionalInterceptor攔截器,默認不開啟
2)existAnnotation判斷當前Bean是否有類或者接口的方法存在@GlobalTransactional或者@GlobalLock注解,如果沒有則直接返回
3)isAopProxy方法是判斷當前的Bean是否已經是spring的代理類了,無論是JDK動態代理還是Cglib類代理。如果是普通的Bean,走原有的生成代理邏輯即可,如果已經是代理類,那么要通過反射獲取代理對象內的攔截器集合也叫做Advisor,直接添加到該集合當中。
wrapIfNecessary的方法並不復雜,但是如果對代理不是很熟悉或許對細節點會有些困惑。
postProcessAfterInitialization數據源代理
wrapIfNecessary創建了代理類,最后看看后置處理又做了啥。跟進該方法
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 判斷是否數據源,且不是數據源代理 if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) { // 創建靜態代理 DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean); Class<?>[] interfaces = SpringProxyUtils.getAllInterfaces(bean); // 創建動態代理類 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes()); if (null != m) { // 如果靜態代理存在目標對象的代理方法,那么調用該代理方法,從而調用目標方法 return m.invoke(dataSourceProxy, args); } else { // 如果靜態代理不存在目標對象的代理方法,那么直接調用目標對象的代理方法 boolean oldAccessible = method.isAccessible(); try { method.setAccessible(true); return method.invoke(bean, args); } finally { //recover the original accessible for security reason method.setAccessible(oldAccessible); } } } }); } // 不需要處理數據源代理的,按照原有邏輯處理 return super.postProcessAfterInitialization(bean, beanName); }
這里有兩大塊邏輯:
1)是數據源,且需要進行數據源代理的,那么特立獨行地走if內的邏輯
2)不需要數據源代理的,那么走原有邏輯
我們關注一下數據源自代理的邏輯,數據源代理或許是seata非常重要的實現之一
首先,DataSource是一個接口,DataSourceProxy對DataSource是一種實現的關系。但是,請注意!!!spring中的數據源的Bean和DataSource是實現關系,可是該Bean和DataSourceProxy並不是繼承或者實現的關系,而是組合關系。
因此,DataSourceProxy作為數據源Bean的靜態代理而存在,而它實現了DataSource的接口,但是很有可能並未實現Bean的一些接口。
這樣一來,就很有必要創建一個動態代理類,來關聯一下DataSourceProxy和Bean之間的關系了。如果DataSourceProxy代理過的方法,那么調用代理方法,如果沒有就直接調用Bean中的方法。
后置處理的大體邏輯就是這樣,基本上就是對數據源進行自動代理處理。
總結
本文主要是自動配置的主體邏輯,自動配置圍繞着GlobalTransactionScanner這個Bean展開。核心邏輯主要是三塊:
1)初始化TransactionManager和ResourceManager的RPC客戶端
2) 對@GlobalTransactional和@GlobalLock注解的方法進行增強
3)數據源代理