Canal和Otter討論二(原理與實踐)


上次留下的問題

問題一: 跨公網部署Otter

參考架構圖

解析

​ a. 數據涉及網絡傳輸,S/E/T/L幾個階段會分散在2個或者更多Node節點上,多個Node之間通過zookeeper進行協同工作 (一般是Select和Extract在一個機房的Node,Transform/Load落在另一個機房的Node,通過zookeeper watcher ()機制觸發客戶端node執行回調事件)

​ 上圖中:

​ A-->B同步走的是黃色的流程,S/E在A機房Node,T/L在B機房Node

​ B-->A同步走的是紅色流程,S/E在B機房Node,T/L在A機房Node

​ b. node節點可以有failover / loadBalancer. (每個機房的Node節點,都可以是集群,一台或者多台機器)

​ c. Canal是嵌入在Node中的,一般選擇在S/E節點上創建Canal,即在離源數據庫機房近的一端Node創建Canal,同一節Node可以創建多個Canal實例,但是針對同一個同步,同一時間只會有一個工作節點。

​ d. 要保證兩端Node網絡是互通的,Manager和各個資源也要是互通的。

配置

阿里雲 1核/2G

騰訊雲 1核/2G

百度雲 2核/4G

分配:

阿里雲 Node Mysql A

騰訊雲 Node Mysql B

百度雲 Manager Otter Mysql Zookeeper

環境: jdk1.8 docker1.19 Mysql5.7(Docker版本) Zookeeper3.14 Otter1.14

部署結構:

graph TB subgraph Otter相關 Manager-->NodeB Manager-->NodeA NodeA-->嵌入式canalA NodeB-->嵌入式canalB Manager-->ManagerMySQL end subgraph 機房A MasterA-->SlaveA1 MasterA-->SlaveA2 MasterA-->SlaveAn 嵌入式canalA-->MasterA end subgraph 機房B MasterB-->SlaveB1 MasterB-->SlaveB2 MasterB-->SlaveBn 嵌入式canalB-->MasterB end

問題二: Otter內部工作模塊構件

SETL階段

解析

為了更好的支持系統的擴展性和靈活性,將整個同步流程抽象為Select/Extract/Transform/Load,這么4個階段.

Select階段: 為解決數據來源的差異性,比如接入canal獲取增量數據,也可以接入其他系統獲取其他數據等。

Extract階段: 組裝數據,針對多種數據來源,mysql,oracle,store,file等,進行數據組裝和過濾。

Transform階段: 數據提取轉換過程,把數據轉換成目標數據源要求的類型

Load階段: 數據載入,具體把數據載入到目標端

自定義拓展
  1. 數據處理自定義,比如Extract , Transform的數據處理. 目前Select/Load不支持數據自定義處理
  2. 組件功能性擴展,比如支持oracle日志獲取,支持hbase數據輸出等.

自由之門:

​ EventProcessor : 自定義數據處理,可以改變一條變更數據的任意內容

​ FileResolver : 解決數據和文件的關聯關系(主要解決異地文件同步路徑問題)

組件拓展目前這塊擴展性機制不夠,設計時只預留了接口,但新增一個功能實現,需要通過硬編碼的方式去進行,下載otter的源碼,增加功能支持,修改spring配置,同時修改web頁面,方便使用。

調度模型

解析
  1. otter通過select模塊串行獲取canal的批數據,注意是串行獲取,每批次獲取到的數據,就會有一個全局標識,otter里稱之為processId.
  2. select模塊獲取到數據后,將其傳遞給后續的ETL模型. 這里E和T模塊會是一個並行處理
  3. 將數據最后傳遞到Load時,會根據每批數據對應的processId,按照順序進行串行加載。 ( 比如有一個processId=2的數據先到了Load模塊,但會阻塞等processId=1的數據Load完成后才會被執行)

簡單一點說,Select/Load模塊會是一個串行機制來保證binlog處理的順序性,Extract/Transform會是一個並行,加速傳輸效率.

並行度

類似於tcp滑動窗口大小,比如整個滑動窗口設置了並行度為5時,只有等第一個processId Load完成后,第6個Select才會去獲取數據。

並行度為三:

sequenceDiagram participant Select participant Extract participant Transform participant Load opt Select串行 Select-->Select:從源數據庫get數據 end Select->>Extract:批次一 par E/T並行處理 Extract->>Transform:組裝后的數據 end Transform->>Load:符合特定目標的數據 Select->>Extract:批次二 par E/T並行處理 Extract->>Transform:組裝后的數據 end Transform->>Load:符合特定目標的數據 Select->>Extract:批次三 par E/T並行處理 Extract->>Transform:組裝后的數據 end Transform->>Load:符合特定目標的數據 opt Load串行 Load-->Load:加載到數據庫 end Load-->>Select:批次一返回確認 Select->>Extract:批次四
SEDA模型(分階段事件驅動)

把數據處理划分成各個階段,由中央控制器調度。

對應到代碼中:

graph TD A[OtterController] -->Z[根據NodeTask獲取pipelineId] Z-->Y[根據pipelineId獲取tasks] Y-->X[遍歷tasks,運行isCreate==Task] X-->B{TaskType?} B-->|SelectTask|C[task=new SelectTask] B-->|ExtractTask|D[task=new ExtractTask] B-->|TransferTask|E[task=new TransferTask] B-->|LoadTask|F[task=new LoadTask] C-->G[start] D-->G[start] E-->G[start] F-->G[start] G-->H{TaskType?} H-->|SelectTask|I[OtterSelector] I-->CanalEmbedSelector執行數據獲取 H-->|ExtractTask|J[OtterExtractor] H-->|TransferTask|K[OtterTransformer] H-->|LoadTask|L[OtterLoader]
解析

將並行化調度的串行/並行處理,進行隱藏,抽象了await/single的接口,整個調度稱之為仲裁器。(有了這層抽象,不同的仲裁器實現可以解決同機房,異地機房的同步需求)

模型接口

​ await模擬object獲取鎖操作

​ notify被喚醒后提交任務到thread pools

​ single模擬object釋放鎖操作,觸發下一個stage

這里使用了SEDA模型的優勢

​ 共享thread pool,解決流控機制

​ 划分多stage,提升資源利用率(異地可以協同工作)

​ 統一編程模型,支持同機房,跨機房不同的調度算法(同機房耗時操作主要是數據處理,跨機房耗時操作主要是數據傳輸和回滾)

在pipe中,通過對數據進行TTL控制,解決TCP協議中的丟包問題控制. SEDA主要還是為了解決傳統並發模型的缺點(鎖機制,不易於控制,耦合度高),通過將服務器的處理划分各個Stage,利用queue連接起來形成一個pipeline的處理鏈,並且在Stage中利用控制器進行資源的調控。

有了一層SEDA調度模型的抽象,S/E/T/L模塊之間互不感知,幾個模塊之間的數據傳遞,需要有一個機制來處理,這里抽象了一個pipe(管道)的概念.

原理:

stage | pipe | stage

基於pipe實現:

​ in memory (兩個stage經過仲裁器調度算法選擇為同一個node時,直接使用內存傳輸)

​ rpc call (傳遞的數據<1MB) (異地)

​ file(gzip) + http多線程下載 (文件,大量數據)

在pipe中,通過對數據進行TTL控制,解決TCP協議中的丟包問題控制. (TTL>n重傳)

仲裁器算法

主要包括: 令牌生成(processId) + 事件通知.

令牌生成:

  • 基於AtomicLong.inc()機制(原子性自增),(純內存機制,解決同機房,單節點同步需求,不需要多節點交互)
  • 基於zookeeper的自增id機制,(解決異地機房,多節點(一般是雙節點)協作同步需求)

事件通知: (簡單原理: 每個stage都會有個阻塞隊列,接收上一個stage的single信號通知,當前stage會阻塞在該block queue上,直到有信號通知)

  • block queue + put/take方法,(同Node純內存機制存取數據)
  • block queue + rpc + put/take方法 (兩個stage對應的node不同,需要rpc調用,需要依賴負載均衡算法解決node節點的選擇問題)
  • block queue + zookeeper watcher () (zookeeper事件客戶端回調)

負載均衡算法

  • Stick : 類似於session stick技術,一旦第一次選擇了node,下一次選擇會繼續使用該node. (有一個好處,資源上下文緩存命中率高)
  • Random : 隨機算法
  • RoundRbin : 輪詢算法

注意點:每個node節點,都會在zookeeper中生成Ephemeral節點(零時節點),每個node都會緩存住當前存活的node列表,node節點消失,通過zookeeper watcher機制刷新每個node機器的內存。然后針對每次負載均衡選擇時只針對當前存活的節點,保證調度的可靠性。

問題三: 名詞表

名詞 解釋 備注
Pipeline 從源端到目標端的整個過程描述,主要由一些同步映射過程組成
Channel 同步通道,單向同步中一個Pipeline組成,在雙向同步中有兩個Pipeline組成
DataMediaPair 根據業務表定義映射關系,比如源表和目標表,字段映射,字段組等
DataMedia 抽象的數據介質概念,可以理解為數據表/mq隊列定義
DataMediaSource 抽象的數據介質源信息,補充描述DateMedia
ColumnPair 定義字段映射關系
ColumnGroup 定義字段映射組
Node 處理同步過程的工作節點,對應一個jvm
SEDA模型 Staged Event-Driven Architecture(把一個請求處理過程分成幾個Stage,不同資源消耗的Stage使用不同數量的線程來處理,Stage間使用事件驅動的異步通信模式。)
stage 程序流程的各個階段(這里指數據的S/E/T/L)
processId 處理批次ID,保證串行順序
get/ack/rollback 獲取/確認/回滾
zookeeper watcher () zookeeper 事件回調機制
zookeeper Ephemeral 節點 zookeeper session創建的一個零時節點
TTL 代表數據在網絡中長時間沒有響應被丟棄時經過的最大的路由器數量
failover 故障轉移


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM