TX-LCN 集群下分布式事務失效問題及解決方案


前言

​ 近期新開發的服務上線后,用戶反饋數據更新不成功;但經過本地測試又是正常的;考慮到本地和線上環境的區別是一個單體一個集群。考慮到這個因素,我在本地又起了一個服務,測試結果是大概率的操作失敗,事務沒有提交成功;由於選擇的框架目前已無人維護所以只能開啟debug模式來排查問題,經過兩天時間的排查終於發現是TM根據模塊名稱找參與者造成的問題,由於框架的模塊名稱取值邏輯是采用項目名稱,集群下的服務注冊到TM時模塊名稱是一樣的,則TM找尋不到具體的參與者,造成了事務提交失敗的。

1、版本

WMErKU.png

2、流程分析

依據下圖的TC代理控制處理的流程圖,從源碼層次來分析集群下分布式事務失效的原因。

[WMEsrF.png

從上圖可知,事務的提交最終交由TM來控制,因此TM最終會通知參與方來響應事務;但在集群的環境下真正的參與方大概率沒有接受到TM傳來的消息;所以我們的切入點自然是TM的通知信息發送這個過程。

2.1 TM的NotifyGroup的源碼分析

TM通知TC的過程

private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
        List<TransactionUnit> transactionUnits = dtxContext.transactionUnits();//@@1
        log.debug("group[{}]'s transaction units: {}", dtxContext.getGroupId(), transactionUnits);
        for (TransactionUnit transUnit : transactionUnits) {
            NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
            notifyUnitParams.setGroupId(dtxContext.getGroupId());
            notifyUnitParams.setUnitId(transUnit.getUnitId());
            notifyUnitParams.setUnitType(transUnit.getUnitType());
            notifyUnitParams.setState(transactionState);
            txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify {}'s unit: {}",
                    transUnit.getModId(), transUnit.getUnitId());
            try {
                List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());//@@2
                if (modChannelKeys.isEmpty()) {
                    // record exception
                    throw new RpcException("offline mod.");
                }
                MessageDto respMsg =
                        rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));//@@3
                if (!MessageUtils.statusOk(respMsg)) {
                    // 提交/回滾失敗的消息處理
                    List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
                    rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
                }
            } catch (RpcException e) {
                // 提交/回滾通訊失敗
                List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
                rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
            } finally {
                txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
            }
        }
    }
  • @@1 獲取該事務組的所有參與方
  • @@2 根據參與方標識獲取連接
  • @@3 向第一個連接發送消息

從@@2和@@3 可知,如果集群下是不是modId一樣造成了TM找不到准確的TC,帶着這個問題我們看看@@2 的處理邏輯:

 
public List<String> remoteKeys(String moduleName) {
        return SocketManager.getInstance().remoteKeys(moduleName);// @@1
    }
 /*
  */
  public List<String> remoteKeys(String moduleName) {
        List<String> allKeys = new ArrayList<>();
        for (Channel channel : channels) {
            if (moduleName.equals(getModuleName(channel))) {//@@2
                allKeys.add(channel.remoteAddress().toString());
            }
        }
        return allKeys;
    }
 /*
  */
 public String getModuleName(Channel channel) {//@@3
        String key = channel.remoteAddress().toString();
        return getModuleName(key);
    }
  /*
   */
 public String getModuleName(String remoteKey) {//@@4
        AppInfo appInfo = appNames.get(remoteKey);
        return appInfo == null ? null : appInfo.getAppName();
    }
  • @@1 依據moduleName獲取TC地址
  • @@2 通過遍歷所有與TM建立的連接,依據moduleName查找符合條件的連接
  • @@3 依據channel得到其ModuleName
  • @@4 依據遠程地址得到ModuleName,其中AppInfo的結構是CurrentHashMap

從上述過程可知transUnit.getModId()和AppInfo 是我們排查的重點;通過其相互的調用關系,最終確定了兩者初始化的地方。

AppInfo的初始化

//TM端的InitClientService
public void bindModuleName(String remoteKey, String appName,String labelName) throws RpcException{
        AppInfo appInfo = new AppInfo();
        appInfo.setAppName(appName);
        appInfo.setLabelName(labelName);
        appInfo.setCreateTime(new Date());
        if(containsLabelName(labelName)){
            throw new RpcException("labelName:"+labelName+" has exist.");
        }
        appNames.put(remoteKey, appInfo);
    }
// appName=applicationName;
 String appName = environment.getProperty("spring.application.name");
 this.applicationName = StringUtils.hasText(appName) ? appName : "application";
// labelName=modIdProvider.getModId
 public static String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {
        String applicationName = environment.getProperty("spring.application.name");
        applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
        return applicationName + ":" + serverPort(serverProperties);
    }

TransactionUnit的初始化

//JoinGroupExecuteService 
transactionManager.join(dtxContext, joinGroupParams.getUnitId(), joinGroupParams.getUnitType(),
                    rpcClient.getAppName(transactionCmd.getRemoteKey()), joinGroupParams.getTransactionState());
//NettyRpcClient
public String getAppName(String remoteKey) {
        return SocketManager.getInstance().getModuleName(remoteKey);
    }
//SocketManager
public String getModuleName(String remoteKey) {
        AppInfo appInfo = appNames.get(remoteKey);
        return appInfo == null ? null : appInfo.getAppName();
    }
//SimpleTransactionManager
public void join(DTXContext dtxContext, String unitId, String unitType, String modId, int userState) throws TransactionException {
        //手動回滾時設置狀態為回滾狀態 0
        if (userState == 0) {
            dtxContext.resetTransactionState(0);
        }
        TransactionUnit transactionUnit = new TransactionUnit();
        transactionUnit.setModId(modId);
        transactionUnit.setUnitId(unitId);
        transactionUnit.setUnitType(unitType);
        dtxContext.join(transactionUnit);
    }

從上述源碼可知TransactionUnit的ModId 最終傳的AppName,而AppName則是environment.getProperty("spring.application.name")得來的。從中可知,集群下同一服務的spring.application.name是相同的。

3、結論

集群下由於應用名稱是一樣的,則造成了TM端發送通知信息時找不到准確的參與方,進而導致了事務提交失敗;鑒於此種情況一種就是**改應用服務名**,啟動一次服務改一次服務名,此種方法繁瑣不切實際故舍去;第二種就是修改源碼改變查找邏輯,將TransactionUnit的ModId改為labelName,其中LableName 的命名為服務名+ip+端口號,故需重寫相關方法。

4、 解決方案

修改源碼兩種方式:

  • 直接下載源碼在源碼中修改,修改完成后打包使用
  • 在項目下創建同目錄同文件的類,依據編譯文件的優先級來達到修改源碼的效果。

本次解決方案會采用第二種方式,第一種方式耦合度高,且不宜框架版本升級。

4.1、重寫modId的方法

將該文件放在項目的公共模塊類中

@Configuration
public class LcnConfig {
    @Bean
    @ConditionalOnMissingBean
    @Primary
    public ModIdProvider modIdProvider(ConfigurableEnvironment environment,
                                       @Autowired(required = false) ServerProperties serverProperties) {
        return () -> modId(environment, serverProperties);
    }

    private String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {

        String applicationName = environment.getProperty("spring.application.name");
        applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
        return applicationName + ":" + serverPort(serverProperties);
    }

    /**
     * 此方法如果獲取的不准確的,請從配置文件中獲取地址。
     * IP:PORT
     *
     * @param serverProperties serverProperties
     * @return int
     */
    private String serverPort(ServerProperties serverProperties) {

        return Objects.isNull(serverProperties) ? "127.0.0.1:8080" :
                (Objects.isNull(serverProperties.getPort()) && Objects.isNull(serverProperties.getAddress().getHostAddress()) ? "127.0.0.1:8080" :
                        serverProperties.getAddress().getHostAddress() + ":" + serverProperties.getPort());
    }
}

4.2 重寫SocketManager類

注:在項目的公共包中建立同目錄同類的文件

/*
 * Copyright 2017-2019 CodingApi .
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.codingapi.txlcn.txmsg.netty.bean;


import com.codingapi.txlcn.txmsg.RpcConfig;
import com.codingapi.txlcn.txmsg.dto.AppInfo;
import com.codingapi.txlcn.txmsg.dto.MessageDto;
import com.codingapi.txlcn.txmsg.dto.RpcCmd;
import com.codingapi.txlcn.txmsg.dto.RpcResponseState;
import com.codingapi.txlcn.txmsg.exception.RpcException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;

/**
 * Created by lorne on 2017/6/30.
 */
@Slf4j
public class SocketManager {

    private Map<String, AppInfo> appNames;

    private ScheduledExecutorService executorService;

    private ChannelGroup channels;

    private static SocketManager manager = null;

    private long attrDelayTime = 1000 * 60;

    private SocketManager() {
        channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        appNames = new ConcurrentHashMap<>();
        executorService = Executors.newSingleThreadScheduledExecutor();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executorService.shutdown();
            try {
                executorService.awaitTermination(10, TimeUnit.MINUTES);
            } catch (InterruptedException ignored) {
            }
        }));
    }


    public static SocketManager getInstance() {
        if (manager == null) {
            synchronized (SocketManager.class) {
                if (manager == null) {
                    manager = new SocketManager();
                }
            }
        }
        return manager;
    }


    public void addChannel(Channel channel) {
        channels.add(channel);
    }

    public void removeChannel(Channel channel) {
        channels.remove(channel);
        String key = channel.remoteAddress().toString();

        // 未設置過期時間,立即過期
        if (attrDelayTime < 0) {
            appNames.remove(key);
            return;
        }

        // 設置了過期時間,到時間后清除
        try {
            executorService.schedule(() -> {
                appNames.remove(key);
            }, attrDelayTime, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException ignored) {
            // caused down server.
        }
    }


    private Channel getChannel(String key) throws RpcException {
        for (Channel channel : channels) {
            String val = channel.remoteAddress().toString();
            if (key.equals(val)) {
                return channel;
            }
        }
        throw new RpcException("channel not online.");
    }


    public RpcResponseState send(String key, RpcCmd cmd) throws RpcException {
        Channel channel = getChannel(key);
        ChannelFuture future = channel.writeAndFlush(cmd).syncUninterruptibly();
        return future.isSuccess() ? RpcResponseState.success : RpcResponseState.fail;
    }

    public MessageDto request(String key, RpcCmd cmd, long timeout) throws RpcException {
        NettyRpcCmd nettyRpcCmd = (NettyRpcCmd) cmd;
        log.debug("get channel, key:{}", key);
        Channel channel = getChannel(key);
        channel.writeAndFlush(nettyRpcCmd);
        log.debug("await response");
        if (timeout < 1) {
            nettyRpcCmd.await();
        } else {
            nettyRpcCmd.await(timeout);
        }
        MessageDto res = cmd.loadResult();
        log.debug("response is: {}", res);
        nettyRpcCmd.loadRpcContent().clear();
        return res;
    }

    public MessageDto request(String key, RpcCmd cmd) throws RpcException {
        return request(key, cmd, -1);
    }


    public List<String> loadAllRemoteKey() {
        List<String> allKeys = new ArrayList<>();
        for (Channel channel : channels) {
            allKeys.add(channel.remoteAddress().toString());
        }
        return allKeys;
    }

    public ChannelGroup getChannels() {
        return channels;
    }

    public int currentSize() {
        return channels.size();
    }


    public boolean noConnect(SocketAddress socketAddress) {
        for (Channel channel : channels) {
            if (channel.remoteAddress().toString().equals(socketAddress.toString())) {
                return false;
            }
        }
        return true;
    }

    /**
     * 獲取模塊的遠程標識keys
     *
     * @param moduleName 模塊名稱
     * @return remoteKeys
     */
    public List<String> remoteKeys(String moduleName) {
        List<String> allKeys = new ArrayList<>();
        for (Channel channel : channels) {
            if (moduleName.equals(getModuleName(channel))) {
                allKeys.add(channel.remoteAddress().toString());
            }
        }
        return allKeys;
    }


    /**
     * 綁定連接數據
     *
     * @param remoteKey 遠程標識
     * @param appName   模塊名稱
     * @param labelName TC標識名稱
     */
    public void bindModuleName(String remoteKey, String appName, String labelName) throws RpcException {
        AppInfo appInfo = new AppInfo();
        appInfo.setAppName(appName);
        appInfo.setLabelName(labelName);
        appInfo.setCreateTime(new Date());
        if (containsLabelName(labelName)) {
            throw new RpcException("labelName:" + labelName + " has exist.");
        }
        appNames.put(remoteKey, appInfo);
    }

   public boolean containsLabelName(String moduleName) {
        Set<String> keys = appNames.keySet();
        for (String key : keys) {
            AppInfo appInfo = appNames.get(key);
            if (moduleName.equals(appInfo.getLabelName())) {
                return true;
            }
        }
        return false;
    }

    public void setRpcConfig(RpcConfig rpcConfig) {
        attrDelayTime = rpcConfig.getAttrDelayTime();
    }

    /**
     * 獲取模塊名稱
     *
     * @param channel 管道信息
     * @return 模塊名稱
     */
    public String getModuleName(Channel channel) {
        String key = channel.remoteAddress().toString();
        return getModuleName(key);
    }

    /**
     * 獲取模塊名稱
     *
     * @param remoteKey 遠程唯一標識
     * @return 模塊名稱
     */
    public String getModuleName(String remoteKey) {
        AppInfo appInfo = appNames.get(remoteKey);
        return appInfo == null ? null : appInfo.getLabelName();
    }

    public List<AppInfo> appInfos() {
        return new ArrayList<>(appNames.values());
    }
}

5、總結

本人寫作能力水平有限,文中相關的講解有誤請幫忙指出,謝謝!下一步的計划分析tx-lcn的框架,了解其內部原理,從中加強對分布式事務的理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM