簡述
CloudCanal 近期實現了 MySQL(RDS) 到 ClickHouse 實時同步的能力,功能包含全量數據遷移、增量數據遷移、結構遷移能力,以及附帶的監控、告警、HA等能力(平台自帶)。
ClickHouse 本身並不直接支持 Update 和 Delete 能力,但是他自帶的 MergeTree 系列表中 CollapsingMergeTree 和 VersionedCollapsingMergeTree 可變相實現實時增量的目的,並且性能完全夠用,能夠比較輕松達到 1k RPS 以上的能力。
接下來的文章,簡要介紹 CloudCanal 是如何實現這個能力,以及作為用戶我們怎么比較好的使用這個能力。
技術點
結構遷移
CloudCanal 默認提供結構遷移,默認選擇 CollapsingMergeTree 作為表引擎,並增加一個默認字段 __cc_ck_sign,源主鍵作為 sortKey,如下示例:
CREATE TABLE console.worker_stats
(
`id` Int64,
`gmt_create` DateTime,
`worker_id` Int64,
`cpu_stat` String,
`mem_stat` String,
`disk_stat` String,
`__cc_ck_sign` Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(__cc_ck_sign)
ORDER BY id
SETTINGS index_granularity = 8192
ClickHouse 表引擎中,CollapsingMergeTree 和 VersionedCollapsingMergeTree 都能通過標記位按規則折疊數據,從而達到更新和刪除的效果。VersionedCollapsingMergeTree 相比 CollapsingMergeTree 優勢在於同一條數據的不同變更可以亂序寫入,但是 CloudCanal 選擇 CollapsingMergeTree 主要原因在於2點
-
- CloudCanal 中同一條記錄必定是按源庫變更順序寫入,不存在亂序情況
-
- 不需要維護 VersionedCollapsingMergeTree 中的 Version 字段(版本,也可以起其他名字)
所以 CloudCanal 選擇了 CollapsingMergeTree 作為默認表引擎。
寫數據
CloudCanal 寫數據主要包含全量和增量兩種,即單次搬遷存量數據和長期同步,兩者寫入略有不同。全量寫入對端主要工作是批量和多線程,因為 CloudCanal 結構遷移默認設置了標記位字段 __cc_ck_sign default 值為 1, 所以就不需要做特殊處理。
對於增量, CloudCanal 則需要做 3 件事情。
- 轉換 Update、Delete 操作為 Insert
這一步有兩件事情要做,第一件是按照操作類型,填充標記字段值,其中 Insert 和 Update 為 1 ,Delete 為 -1 ,第二件是將對應增量數據的前鏡像或者后鏡像填充到結果記錄中,以便后續 insert 寫入。
for (CanalRowChange rowChange : rowChanges) {
switch (rowChange.getEventType()) {
case INSERT: {
for (CanalRowData rowData : rowChange.getRowDatasList()) {
rowData.getAfterColumnsList().add(nonDeleteCol);
records.add(rowData.getAfterColumnsList());
}
break;
}
case UPDATE: {
for (CanalRowData rowData : rowChange.getRowDatasList()) {
rowData.getBeforeColumnsList().add(deleteCol);
records.add(rowData.getBeforeColumnsList());
rowData.getAfterColumnsList().add(nonDeleteCol);
records.add(rowData.getAfterColumnsList());
}
break;
}
case DELETE: {
for (CanalRowData rowData : rowChange.getRowDatasList()) {
rowData.getBeforeColumnsList().add(deleteCol);
records.add(rowData.getBeforeColumnsList());
}
break;
}
default:
throw new CanalException("not supported event type,eventType:" + rowChange.getEventType());
}
}
- 按表歸組
因為 IUD 操作已全部轉換為 Insert, 且為全鏡像(所有字段都填充了值),所以可以按表歸組,然后批量寫入。即使單線程也能滿足大部分場景的同步性能要求。
protected Map<TableUnit, List<CanalRowChange>> groupByTable(IncrementMessage message) {
Map<TableUnit, List<CanalRowChange>> data = new HashMap<>();
for (ParsedEntry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntryType.ROWDATA) {
CanalRowChange rowChange = entry.getRowChange();
if (!rowChange.isDdl()) {
List<CanalRowChange> changes = data.computeIfAbsent(new TableUnit(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()), k -> new ArrayList<>());
changes.add(rowChange);
}
}
}
return data;
}
- 並行寫入
將按表歸組的數據使用並行執行框架執行,具體不詳述。
舉個"栗子"
- 添加數據源

- 創建任務,選擇數據源和庫,並連接成功,點擊下一步

- 選擇數據同步,建議規格至少選擇 1 GB.目前 MySQL->ClickHouse 結構遷移自動過濾,所以選擇無效。點擊下一步

- 選擇表,默認 ClickHouse 上創建
CollapsingMergeTree表引擎,並自動添加__cc_ck_sign折疊標記字段。點擊下一步

- 選擇字段,點擊下一步

- 創建任務

- 等待任務自動結構遷移、全量遷移、數據同步追上

- 造點 Insert、Update、Delete 負載

- 延遲追平狀態,停止負載

- 檢查源端 MySQL 表數據,以其中一張表為例

- 檢查對端 ClickHouse 表數據,不一致?!!

- 手動優化下表,數據一致。雖然可以等待 ClickHouse 自動優化,但是如果需要直接得到准確結果,可手動優化(注意:手動優化可能導致數據庫機器壓力過大)

常見問題
我在ClickHouse上已經創建了表怎么辦?
目前比較建議直接使用 CloudCanal 自動結構遷移的方式來創建任務。
如果已建表為 CollapsingMergeTree 表引擎,請將標記位字段改成 __cc_ck_sign Int8 DEFAULT 1`,再創建任務(此時就不再自動結構遷移,而是使用已存在表)。
如果為其他表引擎,暫時不支持(主要是不支持增量能力,需要 CloudCanal 進一步探索)。
同步過去的數據什么時候合並?
當 CloudCanal 同步數據到 ClickHouse 時,ClickHouse 並不會實時合並數據,也沒有一致性可言,所以一般情況是等待合並,或者直接手動合並(造成機器高負載、高IO),如 optimize table worker_stats FINAL。
DDL 怎么做?
目前 CloudCanal 還未支持到 ClickHouse 的 DDL 同步,產品實現上,目前是忽略的。所以如果做 DDL ,加字段建議對端先加,再加源端,減字段反之。
總結
本文簡要介紹了 CloudCanal 實現 MySQL(RDS) 到 ClickHouse 數據遷移同步的能力,具備一站式、數據實時特點,從技術點、例子、以及常見問題角度展開。文章如有錯誤,煩請大家勘誤,后續也歡迎大家試用,提供寶貴的意見和建議。
CloudCanal-免費好用的企業級數據同步工具,歡迎品鑒。
了解產品可以查看官方網站: http://www.clougence.com
CloudCanal社區:https://www.askcug.com/
