三、全局事務begin請求GlobalBeginRequest


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

上一篇文章中,我們知道了DefaultCoordinator作為分布式事務的協調者承擔了Server端的大部分功能實現。

那么,本文將閱讀一下全局事務的begin請求,首先打開TCInboundHandler

public interface TCInboundHandler {
    // 處理全局事務的begin請求
    GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);

    // ...
}

入口在這里,我們向下找到AbstractTCInboundHandler對它的實現

@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
    GlobalBeginResponse response = new GlobalBeginResponse();
    exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
        @Override
        public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
            try {
                // 內部begin方法
                doGlobalBegin(request, response, rpcContext);
            } catch (StoreException e) {
                // ...
            }
        }
    }, request, response);
    return response;
}

AbstractTCInboundHandler做了一層方法裝飾,主要的抽象實現交付給了doGlobalBegin,我們跟進它

 

doGlobalBegin

doGlobalBegin方法是一個抽象類,我們再向下找到DefaultCoordinator對它的實現

private Core core = CoreFactory.get();

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()));
}

委托給了Core來處理,我們看看CoreFactory的get方法會返回一個怎樣的實現

private static class SingletonHolder {
    private static Core INSTANCE = new DefaultCore();
}

public static final Core get() {
    return SingletonHolder.INSTANCE;
}

默認是一個單例實現,DefaultCore。

 

DefaultCore

既然如此,我們就看看DefaultCore這個默認實現是怎么實現全局事務的begin方法的

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
    // 創建一個session
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // begin這個session
    session.begin();

    // 發送事件
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));

    return session.getXid();
}

begin方法中最核心的就是創建一個GlobalSession以及session的begin。注意,這里addSessionLifecycleListener添加了一個監聽器

 

先跟進createGlobalSession,看看一個GlobalSession是怎么創建的

public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
    GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
    return session;
}

沒有什么特別的就是簡單構造了一個GlobalSession的實例,再跟進GlobalSession的begin方法

@Override
public void begin() throws TransactionException {
    // 狀態設置為begin
    this.status = GlobalStatus.Begin;
    this.beginTime = System.currentTimeMillis();
    this.active = true;
    // 觸發監聽器
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onBegin(this);
    }
}

除了基本的狀態變化之類的數據修改,就是觸發上面我們提到過的監聽器。

找到AbstractSessionManager的onBegin方法,看看監聽到begin事件后怎么處理

@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
    addGlobalSession(globalSession);
}

顧名思義,就是添加到一個統一管理的位置,跟進addGlobalSession

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
    writeSession(LogOperation.GLOBAL_ADD, session);
}

這里我們注意:DefaultSessionManager繼承了AbstractSessionManager並且實現了addGlobalSession方法。我們看看DefaultSessionManager做了什么擴展

protected Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>();

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
    super.addGlobalSession(session);
    sessionMap.put(session.getXid(), session);
}

有着一個內存Map對象,保存了GlobalSession,KEY就是XID

 

再回到AbstractSessionManager的addGlobalSession方法,繼續跟進writeSession方法

protected TransactionStoreManager transactionStoreManager;

private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
    if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
        // ...
    }
}

TransactionStoreManager將負責存儲以及操作session,默認實現是FileBasedSessionManager,FileBasedSessionManager繼承了DefaultSessionManager。我們跟進writeSession

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    writeSessionLock.lock();
    long curFileTrxNum;
    try {
        if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
            return false;
        }
        lastModifiedTime = System.currentTimeMillis();
        curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
        if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
            return saveHistory();
        }
    } catch (Exception exx) {
        return false;
    } finally {
        writeSessionLock.unlock();
    }
    flushDisk(curFileTrxNum, currFileChannel);
    return true;
}

writeDataFile將會把session寫入到文件當中,默認的文件地址在用戶目錄的sessionStore下,比如我的本機地址

在/Users/lay/sessionStore下有一個默認的root.data文件

 

總結

GlobalBeginRequest主要就是創建了一個GlobalSession然后進行持久化存儲,默認是采用File的方式存儲。當然你也可以選擇db的方式等。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM