之前忙活過一個多方合作的項目,異構系統間維護數據一致性有時候是個頭疼但必須解決的問題,今天我從背景、問題、解決思路、技術方案等幾個方面淺談一下,拋磚引玉。
背景
異構系統
近兩年我司承接了某個持續性的會議項目,即每季度或月不定期舉行會議。本項目目前有三個主要供應方(面向用戶的A方,數據中間B方,會議數據同步C方【我司】)。 為了方便演示問題,以下流程和職責都做了裁剪。
簡化流程如下:
簡化職責如下:
-
A方職責: 用戶通過官網/小程序進行報名,A方調用B方的標准接口,不存儲數據 -
B方職責:作為ISP,提供標准查詢、新增、修改等相關接口,幾乎不提供定制。基於表單和表單數據,完成數據存儲與流轉。 -
C方職責:提供導入/更新/審核/注銷等入口,新數據會通知到B方,B方數據新增/更新也會通知到C方。
從圖例來看,B方/C方數據存儲方面是冗余的。但B方只存儲了核心數據,提供不了太多業務行為,C方具有業務需要的全套流程,但在此項目中作為后方支持及后續現場支持,三方形成了一種生態和諧。本篇博客主旨在討論多方異構系統之間如何保證數據的一致性。
產品/項目
從標准Sass系統來講,這樣的多方交互,不利於系統穩定性,有諸多不可控因素。但從項目角度,這是各方考慮/斗爭/談判/費用等綜合因素下友好協商的結果。 當然這是一個私有部署項目,所以會有很多堅持和妥協。
大領導提到一個說法:項目是要交付的,功能完美是產品考慮的。在功能不完善的情況下,如何去交付?
最后的兜底
哎,一言難盡。是通宵了幾次核對/修復數據的,這是最后的辦法了。為了苦逼不再重現,今年要對整個線動一動手術。(說好的.net 不996呢?)(拿着白菜價操着賣白粉的心)。
問題
請求無序
-
C方 需要所有子會報名前,主會必須報名。 -
B方 各會之間的報名數據是無序到達的。
循環更新
-
B方 任意報名數據更新或新增都會推送到C方,C方收到更新也會更新B方。這里有一些措施進行了攔截中斷,但仍會頻繁循環更新問題。這是目前現狀(為什么會出現?太趕工?)
排錯困難
-
無開發環境,需盲寫代碼,發到測試環境進行聯調測試。 -
調用鏈太長,日志過多,排錯時需要根據調用各服務接口來判斷走到了哪步,出現了一個問題。調用鏈能查到一些問題,但不容批量定位問題。單個查太難。
bug
-
高並發下,redis組件出現各種問題(timeout等) -
token問題 -
數據丟失 -
更新失效 -
數據重復 -
隊列積壓 -
接口請求時間超長 -
其他問題...
數據很大,也很小
大部分數據能對上,偶爾幾十個或斷斷續續產生新問題的數據需要及時人工修復。功能有缺陷,人工也是一種交付辦法,但不可持續,太他媽的累了。數據不一致,也是導致通宵核對/修復數據的一大原因。如果數據全一致,就不會那么辛苦了。
解決思路
管理層
-
明確項目是要繼續做的 -
目標產品化/更方便維護方向發展。一團隊養一項目。 -
有改進想法提出來,拉會推進 -
缺人,招人(遙遙無期...)
技術層
-
針對請求無序問題,引入延時隊列,先處理主會、子會延遲幾秒鍾在處理。 -
針對循環更新問題,記錄B方數據來源,非必要情況下,不回更B方。必須終止掉。【冤冤相報何時了】 -
針對排錯困難問題,引入mysql記錄新增報名的請求以及處理結果,可以更快查詢處理結果。 -
針對bug,測試根據各測試場景進行復測,按10/100/1000/3000/萬級規模壓測。提前發下問題。 -
推進客戶方一起做必要去重邏輯。
其他因素
無論是標准產品還是交付項目,做任何改動都要評估。
-
多溝通,大家都是站在一條線的。有利於事情解決的方案認同度會更高。 -
預估花多少時間,有多少資源。 -
能擠出來的空窗期有多久,客戶方/產品方對於需求的急迫性有多強。 -
基於場景測試,把缺陷優先級先列出來,根據空窗期先修復緊急缺陷。
把緊急且影響范圍廣的問題解決了,風險就小了很多了。80%的問題是由20%的因素造成的。 這也正符合程序優化中的時間/空間局部性。
“進程運行時,在一段時間里,程序的執行往往呈現高度的局部性, 包括時間局部性和空間局部性。
”
時間局部性是一旦一個指令被執行了, 則在不久的將來,它可能再被執行。
空間局部性是一旦一個指令一個存儲單元被訪問,那么它附近的單元也將很快被訪問.
技術方案
mysql實現延遲隊列
-
優先處理主會,子會延時處理
由於隱私問題,這里只列部分字段 
-
數據庫輪詢獲取未處理數據
這里如何提高消費速度,可以參考《計算機系統結構》中標量處理機的流水線的一些知識。 
-
首先要無相關,即按AccountId分組,分組內的數據是無沖突/相關的,可以分批進行。記錄各任務狀態,最后統一提交數據庫狀態,然后1s后繼續輪詢。這種類似靜態流水線。動態流水線較為復雜,這里暫不做實現。
do
{
var groupTemps = groupDatas.Skip((pageIndex - 1) * pageSize).Take(pageSize).ToList();
var currentRecords = new List<QidianNotifydelayData>();
foreach (var item in groupTemps)
{
currentRecords.AddRange(item.ToList());
}
var temp = taskFunc(currentRecords);
taskList.Add(temp);
pageIndex++;
}
while ((pageIndex - 1) * pageSize <= groupCount);
//等待全部執行
await Task.WhenAll(taskList.ToArray());
await _dbContext.CommitAsync();
Thread.Sleep(1);
-
如果1s輪詢覺得太浪費,后續可以根據請求發送標記位(下次輪詢時間),有數據時,可以快速輪詢,無數據時放寬時間。極端處理方式,當主會請求過來處理完成后,直接發起子會處理,但要考慮數據庫是否能承受的住這種並發壓力。
-
如果考慮請求會重復執行,可以在執行內加redis鎖。慎用for update,並發一大就over.
/// <summary>
/// 鎖定執行。
/// </summary>
/// <param name="key"></param>
/// <param name="func"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<BizResult<T>> LockExcute<T>(string key, Func<Task<BizResult<T>>> func, int timeSpan)
{
var db = (this._cacheClient as RedisClient).Db;
var mutexKey = string.Format("mutex:", key);
if (await db.StringSetAsync(mutexKey, "1", TimeSpan.FromSeconds(timeSpan), When.NotExists))
{
try
{
var item = await func.Invoke();
return item;
}
catch (Exception ex)
{
_logger.LogError("LockExcute:Exception:" + ex.Message);
return BizResult.BusinessFailed<T>(-1, $"執行失敗,Message:{ex.Message}");
}
finally
{
await db.KeyDeleteAsync(mutexKey);
}
}
else
{
_logger.LogWarning($"LockExcute:Key:{key},正在處理中,請稍候");
return BizResult.BusinessFailed<T>(-1, "正在處理中,請稍候");
}
}
redis實現延遲隊列
-
由於業務中一個Account同時只能處理一個主會,如果在處理子會的時候,主會請求突然過來了,就會有問題,這里就需要加鎖主會。引入了Redis延遲隊列 -
基於Redis ZSet有序集合實現。 -
思路:當前時間戳和延時時間相加,也就是到期時間,存入Redis中,然后不斷輪詢,找到到期的,拿到再刪除即可。 -
目前實現缺點:不利於監控,未發起http請求處理業務,導致調用鏈有缺。
/// <summary>
/// 3.入隊列
/// </summary>
/// <param name="redisModel"></param>
/// <returns></returns>
public async Task EnqueueZset(DataToModel redisModel)
{
redisModel.UpdateTime = redisModel.UpdateTime.AddSeconds(5);// 最后更新時間 + 5秒
var redisDb = _redisConnectionService.GetRedisConnectionMultiplexer().GetDatabase(0);//默認DB0
if (redisDb != null)
{
IsoDateTimeConverter timeFormat = new IsoDateTimeConverter();
timeFormat.DateTimeFormat = "yyyy-MM-dd HH:mm:ss.fff";
await redisDb.SortedSetAddAsync(ZSet_Queue, JsonConvert.SerializeObject(redisModel, Formatting.Indented, timeFormat), redisModel.UpdateTime.ToTimeStamp());//得分 --放入redis
_logger.LogInformation($"數據排隊--入隊列!redisModel:{JsonConvert.SerializeObject(redisModel)}");
}
}
rabbmit實現延遲隊列
-
死信隊列過期--》重推信隊列?暫未實現。
數據更新方案
-
核心原則:先查詢對比,有變更再更新。從B方數據過來的,盡量不再更新回去。減小並發量,控制復雜度。
數據核對方案
-
待補充。未實現自動化。后期可以獲取雙方系統數據,匯總對比。 
部署/壓測/監控
Jmeter(來自於測試同學提供的腳本)
這里只做簡單截圖
-
配置預定義參數

-
必要情況下配置后置處理程序
-
配置好thread group,http request后,執行調用觀察接口
-
查詢請求執行是否成功 
-
查看聚合報告 
kubernetes
-
kubectl get nodes 獲取所有節點 -
kubectl get pod -A 查看所有服務,觀察status和age
-
kubectl logs [-f] [-p] POD [-c CONTAINER] 查看日志信息。
“-c, --container="": 容器名
”
-f, --follow[=false]: 指定是否持續輸出日志
--interactive[=true]: 如果為true,當需要時提示用戶進行輸入。默認為true
--limit-bytes=0: 輸出日志的最大字節數。默認無限制
-p, --previous[=false]: 如果為true,輸出pod中曾經運行過,但目前已終止的容器的日志
--since=0: 僅返回相對時間范圍,如5s、2m或3h,之內的日志。默認返回所有日志。只能同時使用since和since-time中的一種
--since-time="": 僅返回指定時間(RFC3339格式)之后的日志。默認返回所有日志。只能同時使用since和since-time中的一種
--tail=-1: 要顯示的最新的日志條數。默認為-1,顯示所有的日志
--timestamps[=false]: 在日志中包含時間戳
mysql監控(來自於運維同學的反饋)
這里只截圖簡單信息
-
通過雲監控查看mysql狀態[最大連接數/cpu/內存/慢查詢/索引建議/鎖]
調用鏈/日志
此處暫不截圖。

失控
-
一期方案 
-
二期方案
-
三期方案
當然那是進展順利的情況下,不順利的情況下就變成了這樣

某些時候也會聽到如下言論:
-
一定要保證xx的信譽。 -
今晚就不要睡覺了吧?大家多堅持一下。
就如現在的疫情封控一樣,做好了精准防控一片贊歌,失控了就好好居家、共渡難關。 網絡和現實都會告訴你什么就是人間。
總結
以上是關於定制化需求的一些解決方案,希望對未來類似產品或項目做個參考。本篇從問題着手,分析有利於解決/消除異構系統數據一致性辦法。當然數據一致性也依賴於自身系統的高可用,這里未做過多描述,以后再說。
到此結束,謝謝觀看!
原文地址:聊一聊異構系統間數據一致性 https://www.cnblogs.com/fancunwei/p/16125202.html,轉載請注明地址,謝謝
