前言
近期新開發的服務上線后,用戶反饋數據更新不成功;但經過本地測試又是正常的;考慮到本地和線上環境的區別是一個單體一個集群。考慮到這個因素,我在本地又起了一個服務,測試結果是大概率的操作失敗,事務沒有提交成功;由於選擇的框架目前已無人維護所以只能開啟debug模式來排查問題,經過兩天時間的排查終於發現是TM根據模塊名稱找參與者造成的問題,由於框架的模塊名稱取值邏輯是采用項目名稱,集群下的服務注冊到TM時模塊名稱是一樣的,則TM找尋不到具體的參與者,造成了事務提交失敗的。
1、版本
2、流程分析
依據下圖的TC代理控制處理的流程圖,從源碼層次來分析集群下分布式事務失效的原因。
[
從上圖可知,事務的提交最終交由TM來控制,因此TM最終會通知參與方來響應事務;但在集群的環境下真正的參與方大概率沒有接受到TM傳來的消息;所以我們的切入點自然是TM的通知信息發送這個過程。
2.1 TM的NotifyGroup的源碼分析
TM通知TC的過程
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
List<TransactionUnit> transactionUnits = dtxContext.transactionUnits();//@@1
log.debug("group[{}]'s transaction units: {}", dtxContext.getGroupId(), transactionUnits);
for (TransactionUnit transUnit : transactionUnits) {
NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
notifyUnitParams.setGroupId(dtxContext.getGroupId());
notifyUnitParams.setUnitId(transUnit.getUnitId());
notifyUnitParams.setUnitType(transUnit.getUnitType());
notifyUnitParams.setState(transactionState);
txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify {}'s unit: {}",
transUnit.getModId(), transUnit.getUnitId());
try {
List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());//@@2
if (modChannelKeys.isEmpty()) {
// record exception
throw new RpcException("offline mod.");
}
MessageDto respMsg =
rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));//@@3
if (!MessageUtils.statusOk(respMsg)) {
// 提交/回滾失敗的消息處理
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
}
} catch (RpcException e) {
// 提交/回滾通訊失敗
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
} finally {
txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
}
}
}
- @@1 獲取該事務組的所有參與方
- @@2 根據參與方標識獲取連接
- @@3 向第一個連接發送消息
從@@2和@@3 可知,如果集群下是不是modId一樣造成了TM找不到准確的TC,帶着這個問題我們看看@@2 的處理邏輯:
public List<String> remoteKeys(String moduleName) {
return SocketManager.getInstance().remoteKeys(moduleName);// @@1
}
/*
*/
public List<String> remoteKeys(String moduleName) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
if (moduleName.equals(getModuleName(channel))) {//@@2
allKeys.add(channel.remoteAddress().toString());
}
}
return allKeys;
}
/*
*/
public String getModuleName(Channel channel) {//@@3
String key = channel.remoteAddress().toString();
return getModuleName(key);
}
/*
*/
public String getModuleName(String remoteKey) {//@@4
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getAppName();
}
- @@1 依據moduleName獲取TC地址
- @@2 通過遍歷所有與TM建立的連接,依據moduleName查找符合條件的連接
- @@3 依據channel得到其ModuleName
- @@4 依據遠程地址得到ModuleName,其中AppInfo的結構是CurrentHashMap
從上述過程可知transUnit.getModId()和AppInfo 是我們排查的重點;通過其相互的調用關系,最終確定了兩者初始化的地方。
AppInfo的初始化
//TM端的InitClientService
public void bindModuleName(String remoteKey, String appName,String labelName) throws RpcException{
AppInfo appInfo = new AppInfo();
appInfo.setAppName(appName);
appInfo.setLabelName(labelName);
appInfo.setCreateTime(new Date());
if(containsLabelName(labelName)){
throw new RpcException("labelName:"+labelName+" has exist.");
}
appNames.put(remoteKey, appInfo);
}
// appName=applicationName;
String appName = environment.getProperty("spring.application.name");
this.applicationName = StringUtils.hasText(appName) ? appName : "application";
// labelName=modIdProvider.getModId
public static String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {
String applicationName = environment.getProperty("spring.application.name");
applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
return applicationName + ":" + serverPort(serverProperties);
}
TransactionUnit的初始化
//JoinGroupExecuteService
transactionManager.join(dtxContext, joinGroupParams.getUnitId(), joinGroupParams.getUnitType(),
rpcClient.getAppName(transactionCmd.getRemoteKey()), joinGroupParams.getTransactionState());
//NettyRpcClient
public String getAppName(String remoteKey) {
return SocketManager.getInstance().getModuleName(remoteKey);
}
//SocketManager
public String getModuleName(String remoteKey) {
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getAppName();
}
//SimpleTransactionManager
public void join(DTXContext dtxContext, String unitId, String unitType, String modId, int userState) throws TransactionException {
//手動回滾時設置狀態為回滾狀態 0
if (userState == 0) {
dtxContext.resetTransactionState(0);
}
TransactionUnit transactionUnit = new TransactionUnit();
transactionUnit.setModId(modId);
transactionUnit.setUnitId(unitId);
transactionUnit.setUnitType(unitType);
dtxContext.join(transactionUnit);
}
從上述源碼可知TransactionUnit的ModId 最終傳的AppName,而AppName則是environment.getProperty("spring.application.name")得來的。從中可知,集群下同一服務的spring.application.name是相同的。
3、結論
集群下由於應用名稱是一樣的,則造成了TM端發送通知信息時找不到准確的參與方,進而導致了事務提交失敗;鑒於此種情況一種就是**改應用服務名**,啟動一次服務改一次服務名,此種方法繁瑣不切實際故舍去;第二種就是修改源碼改變查找邏輯,將TransactionUnit的ModId改為labelName,其中LableName 的命名為服務名+ip+端口號,故需重寫相關方法。
4、 解決方案
修改源碼兩種方式:
- 直接下載源碼在源碼中修改,修改完成后打包使用
- 在項目下創建同目錄同文件的類,依據編譯文件的優先級來達到修改源碼的效果。
本次解決方案會采用第二種方式,第一種方式耦合度高,且不宜框架版本升級。
4.1、重寫modId的方法
將該文件放在項目的公共模塊類中
@Configuration
public class LcnConfig {
@Bean
@ConditionalOnMissingBean
@Primary
public ModIdProvider modIdProvider(ConfigurableEnvironment environment,
@Autowired(required = false) ServerProperties serverProperties) {
return () -> modId(environment, serverProperties);
}
private String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {
String applicationName = environment.getProperty("spring.application.name");
applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
return applicationName + ":" + serverPort(serverProperties);
}
/**
* 此方法如果獲取的不准確的,請從配置文件中獲取地址。
* IP:PORT
*
* @param serverProperties serverProperties
* @return int
*/
private String serverPort(ServerProperties serverProperties) {
return Objects.isNull(serverProperties) ? "127.0.0.1:8080" :
(Objects.isNull(serverProperties.getPort()) && Objects.isNull(serverProperties.getAddress().getHostAddress()) ? "127.0.0.1:8080" :
serverProperties.getAddress().getHostAddress() + ":" + serverProperties.getPort());
}
}
4.2 重寫SocketManager類
注:在項目的公共包中建立同目錄同類的文件
/*
* Copyright 2017-2019 CodingApi .
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.codingapi.txlcn.txmsg.netty.bean;
import com.codingapi.txlcn.txmsg.RpcConfig;
import com.codingapi.txlcn.txmsg.dto.AppInfo;
import com.codingapi.txlcn.txmsg.dto.MessageDto;
import com.codingapi.txlcn.txmsg.dto.RpcCmd;
import com.codingapi.txlcn.txmsg.dto.RpcResponseState;
import com.codingapi.txlcn.txmsg.exception.RpcException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;
/**
* Created by lorne on 2017/6/30.
*/
@Slf4j
public class SocketManager {
private Map<String, AppInfo> appNames;
private ScheduledExecutorService executorService;
private ChannelGroup channels;
private static SocketManager manager = null;
private long attrDelayTime = 1000 * 60;
private SocketManager() {
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
appNames = new ConcurrentHashMap<>();
executorService = Executors.newSingleThreadScheduledExecutor();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException ignored) {
}
}));
}
public static SocketManager getInstance() {
if (manager == null) {
synchronized (SocketManager.class) {
if (manager == null) {
manager = new SocketManager();
}
}
}
return manager;
}
public void addChannel(Channel channel) {
channels.add(channel);
}
public void removeChannel(Channel channel) {
channels.remove(channel);
String key = channel.remoteAddress().toString();
// 未設置過期時間,立即過期
if (attrDelayTime < 0) {
appNames.remove(key);
return;
}
// 設置了過期時間,到時間后清除
try {
executorService.schedule(() -> {
appNames.remove(key);
}, attrDelayTime, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ignored) {
// caused down server.
}
}
private Channel getChannel(String key) throws RpcException {
for (Channel channel : channels) {
String val = channel.remoteAddress().toString();
if (key.equals(val)) {
return channel;
}
}
throw new RpcException("channel not online.");
}
public RpcResponseState send(String key, RpcCmd cmd) throws RpcException {
Channel channel = getChannel(key);
ChannelFuture future = channel.writeAndFlush(cmd).syncUninterruptibly();
return future.isSuccess() ? RpcResponseState.success : RpcResponseState.fail;
}
public MessageDto request(String key, RpcCmd cmd, long timeout) throws RpcException {
NettyRpcCmd nettyRpcCmd = (NettyRpcCmd) cmd;
log.debug("get channel, key:{}", key);
Channel channel = getChannel(key);
channel.writeAndFlush(nettyRpcCmd);
log.debug("await response");
if (timeout < 1) {
nettyRpcCmd.await();
} else {
nettyRpcCmd.await(timeout);
}
MessageDto res = cmd.loadResult();
log.debug("response is: {}", res);
nettyRpcCmd.loadRpcContent().clear();
return res;
}
public MessageDto request(String key, RpcCmd cmd) throws RpcException {
return request(key, cmd, -1);
}
public List<String> loadAllRemoteKey() {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
allKeys.add(channel.remoteAddress().toString());
}
return allKeys;
}
public ChannelGroup getChannels() {
return channels;
}
public int currentSize() {
return channels.size();
}
public boolean noConnect(SocketAddress socketAddress) {
for (Channel channel : channels) {
if (channel.remoteAddress().toString().equals(socketAddress.toString())) {
return false;
}
}
return true;
}
/**
* 獲取模塊的遠程標識keys
*
* @param moduleName 模塊名稱
* @return remoteKeys
*/
public List<String> remoteKeys(String moduleName) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
if (moduleName.equals(getModuleName(channel))) {
allKeys.add(channel.remoteAddress().toString());
}
}
return allKeys;
}
/**
* 綁定連接數據
*
* @param remoteKey 遠程標識
* @param appName 模塊名稱
* @param labelName TC標識名稱
*/
public void bindModuleName(String remoteKey, String appName, String labelName) throws RpcException {
AppInfo appInfo = new AppInfo();
appInfo.setAppName(appName);
appInfo.setLabelName(labelName);
appInfo.setCreateTime(new Date());
if (containsLabelName(labelName)) {
throw new RpcException("labelName:" + labelName + " has exist.");
}
appNames.put(remoteKey, appInfo);
}
public boolean containsLabelName(String moduleName) {
Set<String> keys = appNames.keySet();
for (String key : keys) {
AppInfo appInfo = appNames.get(key);
if (moduleName.equals(appInfo.getLabelName())) {
return true;
}
}
return false;
}
public void setRpcConfig(RpcConfig rpcConfig) {
attrDelayTime = rpcConfig.getAttrDelayTime();
}
/**
* 獲取模塊名稱
*
* @param channel 管道信息
* @return 模塊名稱
*/
public String getModuleName(Channel channel) {
String key = channel.remoteAddress().toString();
return getModuleName(key);
}
/**
* 獲取模塊名稱
*
* @param remoteKey 遠程唯一標識
* @return 模塊名稱
*/
public String getModuleName(String remoteKey) {
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getLabelName();
}
public List<AppInfo> appInfos() {
return new ArrayList<>(appNames.values());
}
}
5、總結
本人寫作能力水平有限,文中相關的講解有誤請幫忙指出,謝謝!下一步的計划分析tx-lcn的框架,了解其內部原理,從中加強對分布式事務的理解。