基於 EMR OLAP 的開源實時數倉解決方案之 ClickHouse 事務實現


簡介:阿里雲 EMR OLAP 與 Flink 團隊深度合作,支持了 Flink 到 ClickHouse 的 Exactly-Once寫入來保證整個實時數倉數據的准確性。本文介紹了基於 EMR OLAP 的開源實時數倉解決方案。

作者簡介:阿里雲 EMR-OLAP 團隊;主要負責開源大數據 OLAP 引擎的研發,例如 ClickHouse,Starrocks,Trino 等。通過 EMR 產品向阿里雲用戶提供一站式的大數據 OLAP 解決方案。

內容框架
  • 背景
  • 機制梳理
  • 技術方案
  • 測試結果
  • 未來規划

一、背景

Flink 和 ClickHouse 分別是實時流式計算和 OLAP 領域的翹楚,很多互聯網、廣告、游戲等客戶都將兩者聯合使用於構建用戶畫像、實時 BI 報表、應用監控指標查詢、監控等業務,形成了實時數倉解決方案(如圖-1)。這些業務對數據的准確性要求都十分嚴格,所以實時數倉整個鏈路需要保證端到端的 Exactly-Once。

通常來說 Flink 的上游是可以重復讀取或者消費的 pull-based 持久化存儲(例如Kafka),要實現 Source 端的 Exactly-Once 只需要回溯 Source 端的讀取進度即可。Sink 端的 Exactly-Once 則比較復雜,因為 Sink 是 push-based 的,需要依賴目標輸出系統的事務保證,但社區 ClickHouse 對事務並不支持。

所以針對此情況,阿里雲 EMR ClickHouse 與 Flink 團隊一起深度研發,支持了 Flink 到 ClickHouse 的 Exactly-Once寫入來保證整個實時數倉數據的准確性。本文將分別介紹下現有機制以及實現方案。

image.png

圖-1 實時數倉架構

二  機制梳理

ClickHouse 寫入機制

ClickHouse 是一個 MPP 架構的列式 OLAP 系統(如圖-2),各個節點是對等的,通過 Zookeeper 協同數據,可以通過並發對各個節點寫本地表的方式進行大批量的數據導入。

ClickHouse 的 data part 是數據存儲的最小單元,ClickHouse 接收到的數據 Block 在寫入時,會按照 partition 粒度進行拆分,形成一個或多個 data part。data part 在寫入磁盤后,會通過后台merge線程不斷的合並,將小塊的 data part 合並成大塊的 data part,以此降低存儲和讀取的開銷。

在向本地表寫入數據時,ClickHouse 首先會寫入一個臨時的 data part,這個臨時 data part 的數據對客戶端不可見,之后會直接進行 rename 操作,使這個臨時 data part 成為正式 data part,此時數據對客戶端可見。幾乎所有的臨時 data part 都會快速地成功被 rename 成正式 data part,沒有被 rename 成功的臨時 data part 最終將被 ClickHouse 清理策略從磁盤上刪除。

通過上述分析,可以看出 ClickHouse 的數據寫入有一個從臨時 data part 轉為正式 data part 的機制,加以修改可以符合兩階段提交協議,這是實現分布式系統中事務提交一致性的重要協議。

image.png

圖-2 Flink 作業寫入 ClickHouse

注:多個 Flink Task 可以寫入同一個 shard 或 replica

Flink 寫機制

Flink 作為一個分布式處理引擎,提供了基於事務的 Sink 機制,該機制可以保障寫入的 Exactly-Once,相應的數據接收方需要提供遵守 XA 規范的 JDBC 。由於完整的 XA 規范相當復雜,因此,我們先對 Flink 的處理機制進行梳理,結合 ClickHouse 的實際情況,確定需要實現的接口范圍。

為了實現分布式寫入時的事務提交統一,Flink 借助了 checkpoint 機制。該機制能夠周期性地將各個 Operator 中的狀態生成快照並進行持久化存儲。在 checkpoint 機制中,有一個 Coordinator 角色,用來協調所有 Operator 的行為。從 Operator 的角度來看,一次 checkpoint 有三個階段,初始化-->生成快照-->完成/廢棄 checkpoint。從Coordinator的角度來看,需要定時觸發 checkpoint,以及在所有 Operator 完成快照后,觸發 complete 通知。(參考附錄1)

接下來介紹 Flink 中的 Operator 是如何借助事務和 checkpoint 機制來保障 Exactly-Once,Operator 的完整執行需要經過 initial、writeData、snapshot、commit 和 close 階段。

initial 階段:

  • 從快照中取出上次任務執行時持久化的 xid 記錄。快照中主要存儲兩種 xid,一組是未完成 snapshot 階段的 xid,一組是已經完成了 snapshot 的 xid。
  • 接下來對上次未完成 snapshot 的 xid 進行 rollback 操作;對上次已經完成了 snapshot 但 commit 未成功的 xid 進行 commit 重試操作。
  • 若上述操作失敗,則任務初始化失敗,任務中止,進入 close 階段;若上述操作成功,則繼續。
  • 創建一個新的唯一的 xid,作為本次事務ID,將其記錄到快照中。
  • 使用新生成的 xid,調用 JDBC 提供的 start() 接口。

writeData 階段:

  • 事務開啟后,進入寫數據的階段,Operator 的大部分時間都會處於這個階段。在與 ClickHouse 的交互中,此階段為調用 JDBC 提供的 preparedStatement 的 addBatch() 和 executeBatch() 接口,每次寫數據時都會在報文中攜帶當前 xid。
  • 在寫數據階段,首先將數據寫到 Operator 內存中,向 ClickHouse 提交內存中的批量數據有三種觸發方式:內存中的數據條數達到batchsize的閾值;后台定時線程每隔一段時間觸發自動flush;在 snapshot 階段調用end() 和 prepare() 接口之前會調用flush清空緩存。

snapshot 階段:

  • 當前事務會調用 end() 和 prepare() 接口,等待 commit,並更新快照中的狀態。
  • 接下來,會開啟一個新的事務,作為本 Task 的下一次 xid,將新事務記錄到快照中,並調用 JDBC 提供的start() 接口開啟新事務。
  • 將快照持久化存儲。

complete階段:

在所有 Operator 的 snapshot 階段全部正常完成后,Coordinator 會通知所有 Operator 對已經成功的checkpoint 進行 complete 操作,在與 ClickHouse 的交互中,此階段為 Operator 調用 JDBC 提供的 commit() 接口對事務進行提交。

close 階段:

  • 若當前事務尚未進行到 snapshot 階段,則對當前事務進行 rollback 操作。
  • 關閉所有資源。

從上述流程可以總結出,Flink 通過 checkpoint 和事務機制,將上游數據按 checkpoint 周期分割成批,保障每一批數據在全部寫入完成后,再由 Coordinator 通知所有 Operator 共同完成 commit 操作。當有 Operator 寫入失敗時,將會退回到上次成功的 checkpoint 的狀態,並根據快照記錄的 xid 對這一批 checkpoint 的所有 xid 進行 rollback 操作。在有 commit 操作失敗時,將會重試 commit 操作,仍然失敗將會交由人工介入處理。

三、技術方案

整體方案

根據 Flink 和 ClickHouse 的寫入機制,可以描繪出一個Flink 到 ClickHouse 的事務寫入的時序圖(如圖-3)。由於寫的是 ClickHouse 的本地表,並且事務的統一提交由 Coordinator 保障,因此 ClickHouse 無需實現 XA 規范中標准的分布式事務,只需實現兩階段提交協議中的少數關鍵接口,其他接口在 JDBC 側進行缺省即可。

image.png

圖-3 Flink 到 ClickHouse 事務寫入的時序圖

ClickHouse-Server

狀態機

為了實現 ClickHouse 的事務,我們首先定義一下所要實現的事務允許的幾種操作:

  • Begin:開啟一個事務。
  • Write Data:在一個事務內寫數據。
  • Commit:提交一個事務。
  • Rollback:回滾一個未提交的事務。
    事務狀態:
  • Unknown:事務未開啟,此時執行任何操作都是非法的。
  • Initialized:事務已開啟,此時允許所有操作。
  • Committing:事務正在被提交,不再允許 Begin/Write Data 兩種操作。
  • Committed:事務已經被提交,不再允許任何操作。
  • Aborting:事務正在被回滾,不再允許任何操作。
  • Aborted:事務已經被回滾,不再允許任何操作。

完整的狀態機如下圖-4所示:

image.png

圖-4 ClickHouse Server支持事務的狀態機

圖中所有操作均是冪等的。其中,Committing 到 Committed 和 Aborting 到 Aborted 是不需要執行任何操作的,在開始執行 Commit 或 Rollback 時,事務的狀態即轉成 Committing 或 Aborting;在執行完 Commit 或 Rollback 之后,事務的狀態會被設置成 Committed 或 Aborted。

事務處理

Client 通過 HTTP Restful API 訪問 ClickHouse Server,Client 與 ClickHouse Server 間一次完整事務的交互過程如圖-5所示:

image.png

圖-5 Clickhouse事務處理的時序圖

正常流程:

  • Client 向 ClickHouse 集群任意一個 ClickHouse Server 發送 Begin Transaction 請求,並攜帶由 Client 生成的全局唯一的 Transaction ID。ClickHouse Server 收到 Begin Transaction 請求時,會向 Zookeeper 注冊該Transaction ID(包括創建 Transaction ID 及子 Znode 節點),並初始化該 Transaction 的狀態為 Initialized。
  • Client 接收到 Begin Transaction 成功響應時,可以開始寫入數據。當 ClickHouse Server 收到來自 Client 發送的數據時,會生成臨時 data part,但不會將其轉為正式 data part,ClickHouse Server 會將寫入的臨時 data part 信息,以 JSON 的形式,記錄到 Zookeeper 上該 Transaction 的信息中。
  • Client 完成數據的寫入后,會向 ClickHouse Server 發送 Commit Transaction 請求。ClickHouse Server 在收到 Commit Transaction 請求后,根據 ZooKeeper 上對應的Transaction的 data part 信息,將 ClickHouse Server 本地臨時 data part 數據轉為正式的 data part 數據,並更新Transaction 狀態為Committed。Rollback 的過程與 Commit 類似。

異常處理:

  • 如果創建 Transaction ID 過程中發現 Zookeeper 中已經存在相同 Transaction ID,根據 Zookeeper 中記錄的 Transaction 狀態進行處理:如果狀態是 Unknown 則繼續進行處理;如果狀態是 Initialized則直接返回;否則會拋異常。
  • 目前實現的事務還不支持分布式事務,只支持單機事務,所以 Client 只能往記錄該 Transaction ID 的 ClickHouse Server 節點寫數據,如果 ClickHouse Server 接收到到非該節點事務的數據,ClickHouse Server 會直接返回錯誤信息。
  • 與寫入數據不同,如果 Commit 階段 Client 向未記錄該 Transaction ID 的 ClickHouse Server 發送了 Commit Transaction 請求,ClickHouse Server 不會返回錯誤信息,而是返回記錄該 Transaction ID 的 ClickHouse Server 地址給 Client,讓 Client 端重定向到正確的 ClickHouse Server。Rollback 的過程與 Commit 類似。

ClickHouse-JDBC

根據 XA 規范,完整的分布式事務機制需要實現大量的標准接口(參考附錄2)。在本設計中,實際上只需要實現少量關鍵接口,因此,采用了基於組合的適配器模式,向 Flink 提供基於標准 XA 接口的 XAResource 實現,同時對 ClickHouse Server 屏蔽了不需要支持的接口。

對於 XADataSource 的實現,采用了基於繼承的適配器模式,並針對 Exactly-Once 的特性,修改了部分默認配置,如發送失敗的重試次數等參數。

另外,在生產環境中,通常不會通過分布式表,而是通過 SLB 進行數據寫入時的負載均衡。在 Exactly-Once 場景中,Flink 側的 Task 需要保持針對某一 ClickHouse Server 節點的連接,因此不能使用 SLB 的方式進行負載均衡。針對這一問題,我們借鑒了 BalanceClickHouseDataSource 的思路,通過在 URL 中配置多個IP,並在 properties 配置中將 write_mode 設置為 Random ,可以使 XADataSource 在保障 Exactly-Once 的同時,具有負載均衡的能力。

Flink-Connector-ClickHouse

Flink 作為一個流式數據處理引擎,支持向多種數據接收端寫入的能力,每種接收端都需要實現特定的Connector。針對 Exactly-Once,ClickHouse Connector 增加了對於 XADataSource 的選項配置,根據客戶端的配置提供 Exactly-Once 功能。

四、測試結果

ClickHouse 事務性能測試

  • 寫入 ClickHouse 單批次數據量和總批次相同,Client端並發寫線程不同性能比較。由圖-6可以看出,無論 ClickHouse 是否開啟事務, ClickHouse 的吞吐量都與 Client 端並發寫的線程數成正比。開啟事務時,ClickHouse 中臨時 data part 不會立刻被轉為正式 data part,所以在事務完成前大量臨時 data part 不會參與 ClickHouse merge 過程,降低磁盤IO對寫性能的影響,所以開啟事務寫性能較未開啟事務寫性能更好;但事務內包含的批次變多,臨時 data part 在磁盤上的增多導致了合並時 CPU 的壓力增大,從而影響了寫入的性能,開啟事務的寫性能也會降低。

image.png

圖-6 ClickHouse寫入性能壓測(一)
  • 寫入 ClickHouse 總批次 和 Client 端並發寫線程相同,單批次寫入 ClickHouse 數據量不同性能比較。
    由圖-7可以看出,無論ClickHouse 是否開啟事務, ClickHouse 的吞吐量都與單批次數據量大小成正比。開啟事務時,每批次數據越小,ClickHouse 的吞吐量受事務是否開啟的影響就越大,這是因為每批次寫入的時間在事務處理的占比較小,事務會對此產生一定的影響,因此,一次事務包含的批次數量越多,越能夠減少事務對寫入性能的影響;當事務包含批次的增大,事務處理時間在寫入中的占比逐漸降低,ClickHouse merge 產生的影響越來越大,從而影響了寫入的性能,開啟事務較不開啟事務寫性能更好。

image.png

圖-7 ClickHouse寫入性能壓測(二)
  • 總體來說,開啟事務對寫入性能幾乎沒有影響,這個結論是符合我們預期的。

Flink 寫入 ClickHouse 性能比較

  • 對於相同數據量和不同 checkpoint 周期,Flink 寫入 ClickHouse 總耗時如圖-8所示。可以看出,checkpoint 周期對於不開啟 Exactly-Once 的任務耗時沒有影響。對於開啟 Exactly-Once 的任務,在5s 到60s的范圍內,耗時呈現一個先降低后增長的趨勢。原因是在 checkpoint 周期較短時,開啟 Exactly-Once 的 Operator 與 Clickhouse 之間有關事務的交互過於頻繁;在 checkpoint 周期較長時,開啟 Exactly-Once 的 Operator 需要等待 checkpoint 周期結束才能提交最后一次事務,使數據可見。在本測試中,checkpoint周期數據僅作為一個參考,生產環境中,需要根據機器規格和數據寫入速度進行調整。
  • 總體來說,Flink寫入Clickhouse時開啟 Exactly-Once 特性,性能會稍有影響,這個結論是符合我們預期的。

    image.png

五、未來規划

該版本 EMR ClickHouse 實現的事務還不是很完善,只支持單機事務,不支持分布式事務。分布式系統一般都是通過 Meta Server 來做統一元數據管理來支持分布式事務機制。當前我們也正在規划設計 ClickHouse MetaServer 來支持分布式事務,同時可以移除 ClickHouse 對 ZooKeeper 的依賴。

原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM