曹工雜談:分布式事務解決方案之基於本地消息表實現最終一致性
前言
為什么寫這個?其實我這邊的業務場景,嚴格來說,不算是典型的分布式事務,需求是這樣說的:因為我這邊負責的一個服務消費者consumer,是用戶登錄的入口;正常情況下,登錄時候要走用戶中心,這是個單獨的服務;如果用戶中心掛了,我這邊自然是沒法登錄的。
現在的需求就是說,假設用戶中心掛了,也要可以正常登錄。因為我這個consumer其實也是緩存了用戶的數據的,在本地登錄也可以的,如果在我本地登錄的話,我就得后續等用戶中心恢復后,再把相關狀態同步過去。
基於這樣一個需求,我這邊的實現方案是:
1.配置文件里維護一個開關,表示是否開啟:故障轉移模式。暫不考慮動態修改開關(如果要做,簡單做就提供個接口來改;復雜做,就放到配置中心里,我們現在用的nacos,可以改了后推送到服務端)
2.如果開關是打開的,表示需要進行故障轉移,則登錄、退出登錄等各種需要訪問用戶中心的請求,都存儲到數據庫中;數據庫會有一張表,用來存放這類請求。大致如下:
CREATE TABLE `cached_http_req_to_resend` (
`http_req_id` bigint(20) NOT NULL COMMENT '主鍵',
`req_type` tinyint(4) NOT NULL COMMENT '請求類型,1:推送待處置結果給第三方系統',
`third_sys_feign_name` varchar(30) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '第三方系統的名稱,和feignClient的保持一致',
`http_req_body` varchar(4000) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '請求體',
`current_state` tinyint(4) DEFAULT NULL COMMENT '該請求當前狀態,1:成功;2:失敗;3:待處理;4:失敗次數過多,放棄嘗試',
`fail_count` tinyint(4) DEFAULT NULL COMMENT '截止目前,失敗次數;超過指定次數后,將跳過該請求',
`success_time` datetime DEFAULT NULL COMMENT '請求成功發送的時間',
`create_time` datetime DEFAULT NULL COMMENT '創建時間',
`related_entity_id` bigint(21) DEFAULT NULL COMMENT '相關的實體的id,比如在推送待處置警情時,這個id為處警id',
PRIMARY KEY (`http_req_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
3.單獨開一個schedule線程,定時去掃這個表,發現有需要處理的,就去重新發送請求就行了,成功了的,直接更新狀態為success。
這個模式,其實就算是分布式事務中的:本地消息表方案了。
本地消息表,有一個注意的點,就是要把保存消息的操作和業務相關操作,放到同一個事務中,這樣可以確保,業務成功了,消息肯定是落庫了的,很可靠。然后再開啟個定時任務,去掃描消息表即可。
我這邊不是發消息,而是發請求,道理是類似的。
下面開始基於代碼demo來講解。
代碼結構
這邊就是簡單的幾個module,基於spring cloud開發了一個服務提供者和一個服務消費者。服務提供者對外暴露的接口,通過api.jar的形式,提供給消費者,這種算是強耦合了,有優點,也有缺點,這里就不討論了。
消費者通過feign調用服務提供者。有人會問,不需要eureka這些東西嗎,其實是可以不需要的,我們直接在ribbon的配置中,把服務對應的:ip和端口寫死就完了。
我們這里就是,消費者訪問服務提供者,正常情況下直接訪問就行了;但我們這里,模擬的就是服務A訪問不了的情況,所以會直接把請求落庫,后續由定時線程去處理。
服務提供者-api
我們看看服務提供者api,里面僅有一個接口:
public interface FeignServiceA {
/**
*
* @return
*/
@RequestMapping("/login")
public Message<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO);
}
服務提供者的邏輯
其中,邏輯如下:
@RestController
@Slf4j
public class DemoController extends BaseController implements FeignServiceA {
// 1
@Override
public Message<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO) {
log.info("login is ok,param:{}", loginReqVO);
LoginRespVO vo = new LoginRespVO();
vo.setUserName(loginReqVO.getUserName());
vo.setAge(11);
vo.setToken(UUID.randomUUID().toString());
return successResponse(vo);
}
}
這里1處就是提供了一個接口,接口里返回一點點信息。測試一下:
服務消費者之正常請求服務提供者
pom.xml中依賴服務提供者的api
<dependency>
<groupId>com.example</groupId>
<artifactId>service-provider-A-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
feign client代碼
我們需要寫一個接口,繼承其feign api。
@FeignClient(value = "SERVICE-A")
public interface RpcServiceForServiceA extends FeignServiceA {
}
要調用的時候,怎么弄呢? 直接注入該接口,然后調用對應的方法就行了,這樣就可以了。
@Autowired
private RpcServiceForServiceA rpcServiceForServiceA;
Message<LoginRespVO> message = rpcServiceForServiceA.login(reqVO);
但是,我們好像沒有配置注冊中心之類的東西,這個我們可以繞過,因為最終發起調用的是,ribbon這個組件。
ribbon提供了幾個接口,其中一個,就是用來獲取服務對應的實例列表。
這里要說的,就是下面這個接口:
package com.netflix.loadbalancer;
import java.util.List;
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();
}
這個接口,有多個實現,ribbon自帶了幾個實現,然后eureka 、nacos的客戶端,都自己進行了實現。
ribbon自帶的實現中,有一個叫做:
public class ConfigurationBasedServerList extends AbstractServerList<Server> {
private IClientConfig clientConfig;
...
@Override
public List<Server> getUpdatedListOfServers() {
// 1
String listOfServers = clientConfig.get("listOfServers");
return derive(listOfServers);
}
1處可以看到,它獲取服務對應的實例,就是通過去配置文件里獲取listOfServers
這個key中配置的那些。
總之,最終我們向下面這樣配置就行了:
SERVICE-A.ribbon.ReadTimeout=3000
SERVICE-A.ribbon.listOfServers=localhost:8082
SERVICE-A.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
這里的前綴,SERVICE-A
和之前下面這個地方一致就行了:
@FeignClient(value = "SERVICE-A")
public interface RpcServiceForServiceA extends FeignServiceA {
}
正常情況下,就說完了,直接調用就行,和httpclient調用沒啥本質差別。只不過ribbon提供了負載均衡、重試等各種功能。
設計表結構,在使用故障轉移模式時,保存請求
表結構我前面已經貼了,這里就展示下數據吧(可點擊放大查看):
保存請求的代碼很簡單:
@Override
public LoginRespVO login(LoginReqVO reqVO) {
boolean failOverModeOn = isFailOverModeOn();
/**
* 故障轉移沒有開啟,則正常調用服務
*/
if (!failOverModeOn) {
...
return ...;
}
/**
* 1 使用本地數據進行服務,並將請求保存到數據庫中
*/
iCachedHttpReqToResendService.saveLoginReqWhenFailOver(reqVO);
/**
* 返回一個 dummy 數據
*/
return new LoginRespVO();
}
上面的1處,就會保存請求到數據庫。
定時線程消費邏輯
概覽
定時線程這邊,我設計得比較復雜一點。因為實際場景中,上面的表中,會存儲多個第三方服務的請求;比如service-A,service-B。
所以,這里的策略是:
簡單來說,就是定時線程,拿到任務后,按照第三方服務的名字來進行group by操作,比如,要發送到service-A的請求放一起,按時間排好序;要發送給service-B的放一起,排好序。
然后找到service-A,service-B各自對應的處理器,然后把數據丟給這些處理器;處理器拿到后,就會放到阻塞隊列里;
然后此時worker線程就會被阻塞隊列給喚醒,喚醒后,就去開始處理這些請求,包括發起feign調用,並且更新結果到數據庫中。
定時線程入口
@Scheduled(cron = "0/30 * * * * ? ")
public void sendCachedFeignReq() {
Thread.currentThread().setName("SendCachedFeignReqTask");
log.info("start sendCachedFeignReq");
/**
* 1、獲取鎖
*/
boolean success = iCommonDistributedLockService.tryLock(DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.lockName, DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.expireDurationInSeconds);
/**
* 進行業務邏輯處理
*/
iCachedHttpReqToResendService.processCachedFeignReqForLoginLogout();
...
}
這里還加了個分布式鎖的操作,用數據庫實現的,還沒經過充分測試,可能會有點小問題,不過不是重點。
下面看看業務邏輯:
@Override
public void processCachedFeignReqForLoginLogout() {
// 1
String[] feignClients = {EFeignClient.SERVICE_A.getName()};
// 2
for (String feignClient : feignClients) {
/**
* 3 從數據庫獲取要發送到該服務的請求
*/
List<CachedHttpReqToResend> recordsFromDb = getRecordsFromDb(feignClient);
if (CollectionUtils.isEmpty(recordsFromDb)) {
continue;
}
/**
* 4 根據feign client,找到對應的處理器
*/
CachedHttpReqProcessor cachedHttpReqProcessor = cachedHttpReqProcessors.stream().filter(item -> item.support(feignClient)).findFirst().orElse(null);
if (cachedHttpReqProcessor == null) {
throw new RuntimeException();
}
/**
* 5 利用對應的處理器,處理該部分請求
*/
cachedHttpReqProcessor.process(recordsFromDb);
}
}
- 1,定義一個數組,數組中包括所有要處理的第三方系統
- 2,遍歷
- 3,根據該serviceName,比如,根據service-A,去數據庫查詢對應的請求(這里可能和前面的圖有點出入,以這里的代碼為准)
- 4,根據該service-A,找到對應的處理器
- 5,利用第四步找到的處理器,來處理第三步中查到的數據
怎么找到service-A對應的處理器
我們先看看處理器這個接口:
public interface CachedHttpReqProcessor {
/**
* 該處理器是否支持處理該service
* @param feignClientName
* @return
*/
boolean support(String feignClientName);
/**
* 具體的處理邏輯
* @param list
*/
void process(Collection<CachedHttpReqToResend> list);
/**
* worker線程的名字
* @return
*/
String getThreadName();
}
然后看看針對service-A的處理器,是怎么實現的:
@Service
public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor {
// 1
@Override
public boolean support(String feignClientName) {
return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName);
}
@Override
public String getThreadName() {
return "CachedHttpReqProcessorForServiceA";
}
1處,判斷傳入的feign客戶端,是否等於EFeignClient.SERVICE_A
,如果是,說明找到了對應的處理器。
我們這里將這個service,注冊為了bean;在有多個serviceA,serviceB的時候,就會有多個CachedHttpReqProcessor處理器。
我們在之前的上層入口那里,就注入了一個集合:
@Autowired
private List<CachedHttpReqProcessor> cachedHttpReqProcessors;
然后在篩選對應的處理器時,就是通過遍歷這個集合,找到合適的處理器。
具體的,大家可以把代碼拉下來看看。
CachedHttpReqProcessor的處理邏輯
對於serviceA,serviceB,service C,由於處理邏輯很大部分是相同的,我們這里提取了一個抽象類。
@Slf4j
public abstract class AbstractCachedHttpReqProcessor implements CachedHttpReqProcessor {
private LinkedBlockingQueue<CachedHttpReqToResend> blockingQueue = new LinkedBlockingQueue<>(500);
private AtomicBoolean workerInited = new AtomicBoolean(false);
Thread workerThread;
@Override
public void process(Collection<CachedHttpReqToResend> list) {
if (CollectionUtils.isEmpty(list)) {
return;
}
/**
* 1 直到有任務要處理時(該方法被調用時),才去初始化線程
*/
if (workerInited.compareAndSet(false, true)) {
// 2
workerThread = new Thread(new InnerWorker());
workerThread.setDaemon(true);
workerThread.setName(getThreadName());
workerThread.start();
}
/**
* 放到阻塞隊列里
*/
blockingQueue.addAll(list);
}
我們這里1處,給每個處理器,定義了一個工作線程,且只在本方法被調用時,才去初始化該線程;為了防止並發,使用了AtomicBoolean,保證只會初始化一次。
2處,給線程設置了Runnable,它會負責實際的業務處理。
然后3處,直接把要處理的任務,丟到阻塞隊列即可。
Worker的處理邏輯
任務已經是到了阻塞隊列了,那么,誰去處理呢,就是worker了。如果大家忘了整體的設計,可以回去看看那張圖。
public abstract boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend);
/**
* 從隊列取數據;取到后,調用子類的方法去處理;
* 子類處理后,返回處理結果
* 根據結果,設置成功或者失敗的狀態
*/
public class InnerWorker implements Runnable {
@Override
public void run() {
while (true) {
// 1
boolean interrupted = Thread.currentThread().isInterrupted();
if (interrupted) {
log.info("interrupted ,break out");
break;
}
// 2
CachedHttpReqToResend cachedHttpReqToResend;
try {
cachedHttpReqToResend = blockingQueue.take();
} catch (InterruptedException e) {
log.info("interrupted,e:{}", e);
break;
}
// 3
Integer reqType = cachedHttpReqToResend.getReqType();
if (reqType == null) {
continue;
}
try {
/**
* 4 使用模板方法設計模式,交給子類去實現
*/
boolean success = doProcess(reqType, cachedHttpReqToResend);
// 5
if (!success) {
cachedHttpReqToResend.setFailCount(cachedHttpReqToResend.getFailCount() + 1);
} else {
cachedHttpReqToResend.setCurrentState(CachedHttpReqToResend.CURRENT_STATE_SUCCESS);
cachedHttpReqToResend.setSuccessTime(new Date());
}
// 6
boolean count = iCachedHttpReqToResendService.updateById(cachedHttpReqToResend);
if (count) {
log.debug("update sucess");
}
} catch (Throwable throwable) {
log.error("e:{}", throwable);
continue;
}
}
}
}
- 1,判斷是否被中斷了,這樣可以在程序關閉時,感知到;避免線程泄漏
- 2,從阻塞隊列中,獲取任務
- 3,判斷請求類型是否為null,這個是必須要的
- 4,使用模板方法設計模式,具體邏輯,具體怎么發請求,誰去發,交給子類實現
- 5、6,根據結果,更新這條數據的狀態。
子類中的具體邏輯
我們這里貼個全貌:
@Service
@Slf4j
public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor {
@Autowired
private FeignServiceA feignServiceA;
@Autowired
private ObjectMapper objectMapper;
@Override
public boolean support(String feignClientName) {
return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName);
}
@Override
public String getThreadName() {
return "CachedHttpReqProcessorForServiceA";
}
/**
* 1 根據請求type字段,我們就知道是要發送哪一個請求
* @param reqType
* @param cachedHttpReqToResend
* @return
*/
@Override
public boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend) {
switch (reqType) {
// 2
case CachedHttpReqToResend.REQ_TYPE_LOGIN_TO_SERVICE_A: {
// 3
String httpReqBody = cachedHttpReqToResend.getHttpReqBody();
try {
// 4
LoginReqVO loginReqVO = objectMapper.readValue(httpReqBody, LoginReqVO.class);
/**
* 5 發起登錄
*/
Message<LoginRespVO> message = feignServiceA.login(loginReqVO);
boolean success = FeignMsgUtils.isSuccess(message);
return success;
} catch (Throwable e) {
log.error("e:{}", e);
return false;
}
}
}
return true;
}
}
- 1,這個類就是實現了父類中的抽象方法,這里體現的就是模板方法設計模式
- 2,根據請求type,判斷要訪問哪個接口
- 3,4,將請求體進行反序列化
- 5,發起請求,調用feign。
代碼如何使用
具體的代碼,我放在了:
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/blockingqueue-consumer-producer
建表語句:
服務提供者A的訪問入口:
curl -i -X POST \
-H "Content-Type:application/json" \
-d \
'{
"userName": "zhangsan",
"password":"123"
}' \
'http://localhost:8082/login'
服務消費者的application.properties中:
failover.mode=true
這個為true時,就是故障轉移模式,訪問如下接口時,請求會落庫
http://localhost:8081/login.do
為false的話,就會直接進行feign調用。
代碼中的bug
其實這個代碼是有bug的,因為我們是定時線程,假設每隔30s執行,那假設我一開始取了10條出來,假設全部放到隊列了,阻塞隊列此時有10條,假設worker處理特別慢,30s內也沒執行完的話,定時線程會再次取出狀態沒更新的那個任務,又丟到隊列里。
任務就被重復消費了。
大家可以想想怎么處理這個問題,通過這個bug,我也發現,blockingqueue是一種比較徹底的解耦方式,但是,我們這里的業務,解耦了嗎,如果業務不是解耦的,用這個方式,其實是有點問題。
過兩天我再更新這部分的方案,生產者和消費者,這里還是需要通信的,才能避免任務重復消費的問題。
總結
要實現一個本地消息表最終一致性方案,有一定開發量,而且我這里,消費過程中,強行引入了多線程和生產者、消費者模式,增加了部分復雜度。
不過,代碼不就是要多折騰嗎?