從上一篇文章「分布式事務中間件Seata的設計原理」講了下 Seata AT 模式的一些設計原理,從中也知道了 AT 模式的三個角色(RM、TM、TC),接下來我會更新 Seata 源碼分析系列文章。今天就來分析 Seata AT 模式在啟動的時候都做了哪些操作。
客戶端啟動邏輯
TM 是負責整個全局事務的管理器,因此一個全局事務是由 TM 開啟的,TM 有個全局管理類 GlobalTransaction,結構如下:
io.seata.tm.api.GlobalTransaction
public interface GlobalTransaction {
void begin() throws TransactionException;
void begin(int timeout) throws TransactionException;
void begin(int timeout, String name) throws TransactionException;
void commit() throws TransactionException;
void rollback() throws TransactionException;
GlobalStatus getStatus() throws TransactionException;
// ...
}
可以通過 GlobalTransactionContext 創建一個 GlobalTransaction,然后用 GlobalTransaction 進行全局事務的開啟、提交、回滾等操作,因此我們直接用 API 方式使用 Seata AT 模式:
//init seata;
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
//trx
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(60000, "testBiz");
// 事務處理
// ...
tx.commit();
} catch (Exception exx) {
tx.rollback();
throw exx;
}
如果每次使用全局事務都這樣寫,難免會造成代碼冗余,我們的項目都是基於 Spring 容器,這時我們可以利用 Spring AOP 的特性,用模板模式把這些冗余代碼封裝模版里,參考 Mybatis-spring 也是做了這么一件事情,那么接下來我們來分析一下基於 Spring 的項目啟動 Seata 並注冊全局事務時都做了哪些工作。
我們開啟一個全局事務是在方法上加上 @GlobalTransactional
注解,Seata 的 Spring 模塊中,有個 GlobalTransactionScanner,它的繼承關系如下:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean {
// ...
}
在基於 Spring 項目的啟動過程中,對該類會有如下初始化流程:
InitializingBean 的 afterPropertiesSet() 方法調用了 initClient() 方法:
io.seata.spring.annotation.GlobalTransactionScanner#initClient
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
對 TM 和 RM 做了初始化操作。
- TM 初始化
io.seata.tm.TMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
// 獲取 TmRpcClient 實例
TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
// 初始化 TM Client
tmRpcClient.init();
}
調用 TmRpcClient.getInstance() 方法會獲取一個 TM 客戶端實例,在獲取過程中,會創建 Netty 客戶端配置文件對象,以及創建 messageExecutor 線程池,該線程池用於在處理各種與服務端的消息交互,在創建 TmRpcClient 實例時,創建 ClientBootstrap,用於管理 Netty 服務的啟停,以及 ClientChannelManager,它是專門用於管理 Netty 客戶端對象池,Seata 的 Netty 部分配合使用了對象吃,后面在分析網絡模塊會講到。
io.seata.core.rpc.netty.AbstractRpcRemotingClient#init
public void init() {
clientBootstrap.start();
// 定時嘗試連接服務端
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
}
調用 TM 客戶端 init() 方法,最終會啟動 netty 客戶端(此時還未真正啟動,在對象池被調用時才會被真正啟動);開啟一個定時任務,定時重新發送 RegisterTMRequest(RM 客戶端會發送 RegisterRMRequest)請求嘗試連接服務端,具體邏輯是在 NettyClientChannelManager 中的 channels 中緩存了客戶端 channel,如果此時 channels 不存在獲取已過期,那么就會嘗試連接服務端以重新獲取 channel 並將其緩存到 channels 中;開啟一條單獨線程,用於處理異步請求發送,這里用得很巧妙,之后在分析網絡模塊在具體對其進行分析。
io.seata.core.rpc.netty.AbstractRpcRemoting#init
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}
在 AbstractRpcRemoting 的 init 方法中,又是開啟了一個定時任務,該定時任務主要是用於定時清除 futures 已過期的 futrue,futures 是保存發送請求需要返回結果的 future 對象,該對象有個超時時間,過了超時時間就會自動拋異常,因此需要定時清除已過期的 future 對象。
- RM 初始化
io.seata.rm.RMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
rmRpcClient.setResourceManager(DefaultResourceManager.get());
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
rmRpcClient.init();
}
RmRpcClient.getInstance 處理邏輯與 TM 大致相同;ResourceManager 是 RM 資源管理器,負責分支事務的注冊、提交、上報、以及回滾操作,以及全局鎖的查詢操作,DefaultResourceManager 會持有當前所有的 RM 資源管理器,進行統一調用處理,而 get() 方法主要是加載當前的資源管理器,主要用了類似 SPI 的機制,進行靈活加載,如下圖,Seata 會掃描 META-INF/services/ 目錄下的配置類並進行動態加載。
ClientMessageListener 是 RM 消息處理監聽器,用於負責處理從 TC 發送過來的指令,並對分支進行分支提交、分支回滾,以及 undo log 文件刪除操作;最后 init 方法跟 TM 邏輯也大體一致;DefaultRMHandler 封裝了 RM 分支事務的一些具體操作邏輯。
接下來再看看 wrapIfNecessary 方法究竟做了哪些操作。
io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 判斷是否有開啟全局事務
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 判斷 bean 中是否有 GlobalTransactional 和 GlobalLock 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
// 創建代理類
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]",
bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 執行包裝目標對象到代理對象
Advisor[] advisor = super.buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
GlobalTransactionScanner 繼承了 AbstractAutoProxyCreator,用於對 Spring AOP 支持,從代碼中可看出,用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法。
GlobalTransactionalInterceptor 實現了 MethodInterceptor:
io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
// 全局事務注解
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
// 全局鎖注解
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
以上是代理方法執行的邏輯邏輯,其中 handleGlobalTransaction() 方法里面調用了 TransactionalTemplate 模版:
io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public TransactionInfo getTransactionInfo() {
// ...
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// ...
}
}
handleGlobalTransaction() 方法執行了就是 TransactionalTemplate 模版類的 execute 方法:
io.seata.tm.api.TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
以上是不是有一種似曾相識的感覺?沒錯,以上就是我們使用 API 時經常寫的冗余代碼,現在 Spring 通過代理模式,把這些冗余代碼都封裝帶模版里面了,它將那些冗余代碼統統封裝起來統一流程處理,並不需要你顯示寫出來了,有興趣的也可以去看看 Mybatis-spring 的源碼,也是寫得非常精彩。
服務端處理邏輯
服務端收到客戶端的連接,那當然是將其 channel 也緩存起來,前面也說到客戶端會發送 RegisterRMRequest/RegisterTMRequest 請求給服務端,服務端收到后會調用 ServerMessageListener 監聽器處理:
io.seata.core.rpc.ServerMessageListener
public interface ServerMessageListener {
// 處理各種事務,如分支注冊、分支提交、分支上報、分支回滾等等
void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender);
// 處理 RM 客戶端的注冊連接
void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx,
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler);
// 處理 TM 客戶端的注冊連接
void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx,
ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler);
// 服務端與客戶端保持心跳
void onCheckMessage(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)
}
ChannelManager 是服務端 channel 的管理器,服務端每次和客戶端通信,都需要從 ChannelManager 中獲取客戶端對應的 channel,它用於保存 TM 和 RM 客戶端 channel 的緩存結構如下:
/**
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>();
/**
* ip+appname,port
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();
以上的 Map 結構有點復雜:
RM_CHANNELS:
- resourceId 指的是 RM client 的數據庫地址;
- applicationId 指的是 RM client 的服務 Id,比如 springboot 的配置 spring.application.name=account-service 中的 account-service 即是 applicationId;
- ip 指的是 RM client 服務地址;
- port 指的是 RM client 服務地址;
- RpcContext 保存了本次注冊請求的信息。
TM_CHANNELS:
- ip+appname:這里的注釋應該是寫錯了,應該是 appname+ip,即 TM_CHANNELS 的 Map 結構第一個 key 為 appname+ip;
- port:客戶端的端口號。
以下是 RM Client 注冊邏輯:
io.seata.core.rpc.ChannelManager#registerRMChannel
public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(resourceManagerRequest.getVersion());
// 將 ResourceIds 數據庫連接連接信息放入一個set中
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
RpcContext rpcContext;
// 從緩存中判斷是否有該channel信息
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// 根據請求注冊信息,構建 rpcContext
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getResourceIds(), channel);
// 將 rpcContext 放入緩存中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
} else {
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (null == dbkeySet || dbkeySet.isEmpty()) { return; }
for (String resourceId : dbkeySet) {
String clientIp;
// 將請求信息存入 RM_CHANNELS 中,這里用了 java8 的 computeIfAbsent 方法操作
ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
// 將當前 rpcContext 放入 portMap 中
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
}
}
從以上代碼邏輯能夠看出,注冊 RM client 主要是將注冊請求信息,放入 RM_CHANNELS 緩存中,同時還會從 IDENTIFIED_CHANNELS 中判斷本次請求的 channel 是否已驗證過,IDENTIFIED_CHANNELS 的結構如下:
private static final ConcurrentMap<Channel, RpcContext> IDENTIFIED_CHANNELS
= new ConcurrentHashMap<>();
IDENTIFIED_CHANNELS 包含了所有 TM 和 RM 已注冊的 channel。
以下是 TM 注冊邏輯:
io.seata.core.rpc.ChannelManager#registerTMChannel
public static void registerTMChannel(RegisterTMRequest request, Channel channel)
throws IncompatibleVersionException {
Version.checkVersion(request.getVersion());
// 根據請求注冊信息,構建 RpcContext
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(),
request.getTransactionServiceGroup(),
null, channel);
// 將 RpcContext 放入 IDENTIFIED_CHANNELS 緩存中
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
// account-service:127.0.0.1:63353
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
+ getClientIpFromChannel(channel);
// 將請求信息存入 TM_CHANNELS 緩存中
TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>());
// 將上一步創建好的get出來,之后再將rpcContext放入這個map的value中
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
rpcContext.holdInClientChannels(clientIdentifiedMap);
}
TM client 的注冊大體類似,把本次注冊的信息放入對應的緩存中保存,但比 RM client 的注冊邏輯簡單一些,主要是 RM client 會涉及分支事務資源的信息,需要注冊的信息也會比 TM client 多。
更多精彩文章請關注作者維護的公眾號「后端進階」,這是一個專注后端相關技術的公眾號。
關注公眾號並回復「后端」免費領取后端相關電子書籍。
歡迎分享,轉載請保留出處。