所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
在上一篇文章中,我們知道了DefaultCoordinator作為分布式事務的協調者承擔了Server端的大部分功能實現。
那么,本文將閱讀一下全局事務的begin請求,首先打開TCInboundHandler
public interface TCInboundHandler { // 處理全局事務的begin請求 GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext); // ... }
入口在這里,我們向下找到AbstractTCInboundHandler對它的實現
@Override public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) { GlobalBeginResponse response = new GlobalBeginResponse(); exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() { @Override public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException { try { // 內部begin方法 doGlobalBegin(request, response, rpcContext); } catch (StoreException e) { // ... } } }, request, response); return response; }
AbstractTCInboundHandler做了一層方法裝飾,主要的抽象實現交付給了doGlobalBegin,我們跟進它
doGlobalBegin
doGlobalBegin方法是一個抽象類,我們再向下找到DefaultCoordinator對它的實現
private Core core = CoreFactory.get(); @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); }
委托給了Core來處理,我們看看CoreFactory的get方法會返回一個怎樣的實現
private static class SingletonHolder { private static Core INSTANCE = new DefaultCore(); } public static final Core get() { return SingletonHolder.INSTANCE; }
默認是一個單例實現,DefaultCore。
DefaultCore
既然如此,我們就看看DefaultCore這個默認實現是怎么實現全局事務的begin方法的
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 創建一個session GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // begin這個session session.begin(); // 發送事件 eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), session.getBeginTime(), null, session.getStatus())); return session.getXid(); }
begin方法中最核心的就是創建一個GlobalSession以及session的begin。注意,這里addSessionLifecycleListener添加了一個監聽器
先跟進createGlobalSession,看看一個GlobalSession是怎么創建的
public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) { GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout); return session; }
沒有什么特別的就是簡單構造了一個GlobalSession的實例,再跟進GlobalSession的begin方法
@Override public void begin() throws TransactionException { // 狀態設置為begin this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; // 觸發監聽器 for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); } }
除了基本的狀態變化之類的數據修改,就是觸發上面我們提到過的監聽器。
找到AbstractSessionManager的onBegin方法,看看監聽到begin事件后怎么處理
@Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); }
顧名思義,就是添加到一個統一管理的位置,跟進addGlobalSession
@Override public void addGlobalSession(GlobalSession session) throws TransactionException { writeSession(LogOperation.GLOBAL_ADD, session); }
這里我們注意:DefaultSessionManager繼承了AbstractSessionManager並且實現了addGlobalSession方法。我們看看DefaultSessionManager做了什么擴展
protected Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>(); @Override public void addGlobalSession(GlobalSession session) throws TransactionException { super.addGlobalSession(session); sessionMap.put(session.getXid(), session); }
有着一個內存Map對象,保存了GlobalSession,KEY就是XID
再回到AbstractSessionManager的addGlobalSession方法,繼續跟進writeSession方法
protected TransactionStoreManager transactionStoreManager; private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException { if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) { // ... } }
TransactionStoreManager將負責存儲以及操作session,默認實現是FileBasedSessionManager,FileBasedSessionManager繼承了DefaultSessionManager。我們跟進writeSession
@Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { writeSessionLock.lock(); long curFileTrxNum; try { if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) { return false; } lastModifiedTime = System.currentTimeMillis(); curFileTrxNum = FILE_TRX_NUM.incrementAndGet(); if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) { return saveHistory(); } } catch (Exception exx) { return false; } finally { writeSessionLock.unlock(); } flushDisk(curFileTrxNum, currFileChannel); return true; }
writeDataFile將會把session寫入到文件當中,默認的文件地址在用戶目錄的sessionStore下,比如我的本機地址
在/Users/lay/sessionStore下有一個默認的root.data文件
總結
GlobalBeginRequest主要就是創建了一個GlobalSession然后進行持久化存儲,默認是采用File的方式存儲。當然你也可以選擇db的方式等。