多表同步 ES 的問題


原始需求

對跨業務域數據提供聯查搜索能力

  • 比如:對退款單提供根據退款單、退款狀態、發貨狀態的聯查,其中退款狀態和發貨狀態是跨業務域。
  • 比如:對訂單提供根據訂單號、訂單狀態、退款狀態的聯查,其中訂單狀態和退款狀態是跨業務域。

為什么要上溯需求層面 ?要優化現有方案,容易局限在現有方案的框架里。上溯到需求層面,能夠跳出現有方案框架,在更大的范圍內搜索解決方案,亦可對現有方案的部分設計與實現的前提和約束有更為清晰的認識。

目標

將多源數據存儲 (S1,S2,...,Sn) 的數據同步到具備聯查能力的目標數據存儲 T,且必須滿足:

  1. T 中的不變數據與 Si 中對應的不變數據應當在指定計算關系下保持一致;
  2. 對於每一次同步,T 中的可變數據與 Si 中對應的可變數據在最新狀態下保持一致。

在現實場景下,源數據存儲 Si 通常是數據庫 DB , 而 T 則有不同選擇。 比如,有贊選擇了 ES 。選擇不同的存儲,對解決方案的影響也是非常大的。

注意:需求和目標是存在差異的。目標是原始需求在當前環境約束下所能轉化的第二層需求。如果跨業務域數據都存儲在一張源數據存儲中,也就是 源數據存儲-目標數據存儲 是 1:1 的關系,那么目標就是單數據存儲的同步了,解決方案也會簡單得多。

但是,在規范的設計理念的指引下,不同業務域的數據通常存儲在不同的數據表里,因此才會有多表同步的需求。

制定目標時,需要考慮當前環境約束的影響。如果當前環境約束過強,則應適當考慮能否改造環境約束,使得設計方案有更靈活的空間可供選擇。環境約束改變了,同樣需求的目標也會發生變化。

設計關鍵

數據狀態變更的每一次最新狀態同步必須保持強一致性。

這里的難點在於可變狀態的同步。因為不可變狀態只需要原樣同步即可,且在任何時刻進行均可。

可變狀態的同步怎么弄呢 ? 假設狀態是不可逆的,且數值是遞增的,比如訂單狀態的數值只能從 99 -> 100 ,不可能從 100 -> 99, 這樣,通過數值的比較(而不是時間戳),就可以過濾掉那些過時的狀態。解決方案也會相對簡單。

現實場景中,狀態的逆轉往往不可避免,狀態數值的遞增關系也不一定滿足,因此,通用的方案不能以狀態數字的遞增關系為前提(否則就會削弱方案的通用性),而要以狀態變更的原始時間戳為准。

ES 的前提

ES 存儲的數據是一個大的 JSON 串,沒有字段級別的版本控制,只有針對整個 JSON 串的一個版本控制。 在多表情形下,同步 ES 需要考慮全局版本控制問題。

ES 數據存儲: https://elasticsearch.cn/article/6178

ES 版本樂觀控制:https://es.xiaoleilu.com/030_Data/40_Version_control.html

多表同步的現有方案: https://tech.youzan.com/you-zan-ding-dan-tong-bu-de-tan-suo-yu-shi-jian-2/


同步ES的基本邏輯

情形一

假設有一個 索引 E 含有 (refund_id, order_no, rp_status, version)

有一個 DB 表 r (refund_id, order_no, rp_status, version)

同步方案:

只要按照 version 字段,同步到 ES 即可。對於 T1 M1, T2 M2 ,如果 T1.version < T2.verison ,將 T2 時刻的記錄同步寫入。 此時,使用非順序隊列即可。

結論:

理想的情形下, 一個 ES 表完全僅對應一個 DB , DB 含有 version 字段,可以作為 ES 表的 version 版本控制,使用非順序隊列。 做搜索索引,應盡可能符合這種情形。


情形二

現在假設有 t (order_no, shop_name) ,要把 shop_name 同步到 E 中。 shop_name 是不變的。

這時,只要在同步處理消息的時候, 關聯 t 表,將 shop_name 讀取,寫入到 E 即可。 只是增加了一次 DB 訪問,仍然可以使用非順序隊列。

結論:

將 DB1 , DB2 同步到 E 中, 以 DB1 為主, 獲取 DB2 的不變字段,依然可以使用非順序隊列。DB1 的 version 字段作為版本控制。

需要注意的是,對於分庫分表來說, 這里的 version 必須是全局遞增的 version ,而不是某個分表的 version 。因為某個分表的 version 是不能滿足遞增特性的。


情形三

現在假設有 t (order_no, order_status) , 需要把 order_status 同步到索引 E 中。這時候,如果用 version 是不夠的。

存在這樣的情形:

假設退款單 R001 (T1 M1)→ (T2,M2) 發生了 rp_status = 1 → 3 ; 訂單狀態 order_status (T1', M1') → (T2', M2') 發生了 order_status = 1 → 5。

現在 T2 時刻的退款單消息 M2 先於 M1 抵達,實時獲取了 T1' 的訂單狀態 1 ;得到了 T2R = (R001, E001, rp_status=3, order_status=1, version =3)

然后 T1 時刻的退款單消息 M1 抵達,實時獲取了 T2' 的訂單狀態 5, 得到了 T1R =(R001, E001, rp_status=1, order_status=5, version=1)

由於 T2R.version > T1R.verison, 因此 T2R 寫入。 然而,此時,order_status 的同步是錯誤的。

結論:

當 DB1 (主),DB2 (輔) 均要同步到索引 E 時,如果 DB1 和 DB2 所需同步的字段都存在變化,那么,使用 DB1 的 version 字段控制版本號是不可行的。這將導致 DB2 的字段同步變更存在錯誤。

此時,就同步 ES 來說,應當盡量避免這種情形。在設計方案的時候:

  1. 避免將兩個表的可變狀態同時同步到一個索引里。

  2. 在業務層就能做到把 DB2 的可變字段冗余到 DB1 里。不過,這樣會增加 DB1 設計和業務更新的復雜度,且事先也不會想到這種冗余方法。


現有解決方案

要解決多表同步的問題,有兩種現有方案:

使用順序隊列

上述的問題引發的原因是, DB1 的后更新的消息先抵達先處理。 使用順序隊列,使得 DB1 的先更新的消息始終先處理,這樣,就不會導致 后更新的消息獲取到過時的 DB2 的字段狀態了。 使用順序隊列的原理是,設置 DB1 的主鍵 ID 作為順序隊列的排序 key 。 順序隊列的優勢,是讓業務方處理容易了;但順序隊列的並發吞吐量取決於隊列分區數,且容易因為一條消息處理出錯而阻塞后續的處理。


使用非順序隊列

使用非順序隊列,需要中間存儲,自定義的全局版本號 G 和 一個全局的 存儲 S

理想情況下,無論誰先到達,都應該寫入 最新的數據。那么,這個全局存儲 S 和 全局版本號 應該具備什么特征呢 ?

  1. 全局存儲 S,應當含有同步 ES 所需的所有字段;

  2. 每次通過全局存儲 S 和 全局版本號 G 的處理 GlobalHandler, 總能拿到所有字段的最新值 FVnew;

  3. 每次通過全局存儲 S 和 全局版本號 G 的處理 GlobalHandler, 總能拿到遞增的全局版本號 G;

  4. 以 G 為 ES 的版本號控制,總能將遞增的 G 和 最新值 FVnew 寫入。

實現思路:

  1. 第一點相對容易,梳理一下所需要同步的字段即可。舉上為例:<refund_id, order_no, rp_status, order_status, G>

  2. 第二點:全局存儲 S,具備字段級別的過濾能力,能夠根據時間戳過濾掉狀態過時的字段值;也就是說,對於要同步的字段 Fi (i= 1,2,..., N), 如果 Fi.timestamp_t1 > Fi.timestamp_t2 ,則 Fi_value_timestamp_t2 被丟棄。每次寫入 S 之后,再取出 S 的最新值。 這正是 HBaseFilter 的功能。

    比如說, M2 先抵達,T2 rp_status = 3 被同步到 S ,然后取出最新的值寫入 E; 接着, M1 抵達,由於 rp_status = 1 的時間戳 T1 < T2 ,因此, rp_status = 1 會被 S 過濾掉,不起作用,然后取出最新的 S 寫入 ES ; 接着 M3 抵達,將 order_status = 5 寫入 S,再取出 S 的最新值寫入 E。

  3. 第三點: 全局版本號 G 的計算,保證消息寫入的遞增性。 同一個表 比如 DB1 的時間戳是有先后的,比如 T2 > T1 , 但是 不同表的時間戳是沒有先后的,比如 DB1 T2 與 T3 是無法確定誰大雖小的。

假設 Gf 是消息所帶時間戳 T 的函數,Gf = G(T,Ginit) ,Ginit 是全局版本號 G 的初始值,那么 Gf 應當滿足什么條件呢 ?

首先 G1 = Gf(T1, Ginit) , G2 = Gf(T2,G1) ,此時 應始終滿足: G1 > Ginit , G2 > G1。

即:Gm = Gf(T1, Ginit) > Ginit, Gf(T2, Gm) > Gm 。

也就是說,對於任意的 t 及 Gm, 都有 Gf(t, Gm) > Gm。一個簡單的實現是,Gf(t, Gm) = a * t + Gm + b, a 和 b 為常數。

考慮 t = 0 , 則 b > 0 。 取 b = 1. Gf(t, Gm) = a * t + Gm + 1



方案對比:

  1. 順序隊列方案更簡單,只需要一個任務,所有同步邏輯都在這個任務里,且流程更符合自然思維;但順序隊列方案容易阻塞,吞吐量有瓶頸。適合中小型業務量。

  2. 非順序隊列方案,吞吐量更優,不會因為某個消息消費阻塞;不過方案也更復雜一點,需要多個任務,額外全局存儲,且同步邏輯較為分散,不容易直接理解。適合大型業務量。

【未完待續】


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM