MySQL到ClickHouse實時同步-CloudCanal實戰


簡述

CloudCanal 近期實現了 MySQL(RDS) 到 ClickHouse 實時同步的能力,功能包含全量數據遷移、增量數據遷移、結構遷移能力,以及附帶的監控、告警、HA等能力(平台自帶)。

ClickHouse 本身並不直接支持 Update 和 Delete 能力,但是他自帶的 MergeTree 系列表中 CollapsingMergeTreeVersionedCollapsingMergeTree 可變相實現實時增量的目的,並且性能完全夠用,能夠比較輕松達到 1k RPS 以上的能力。

接下來的文章,簡要介紹 CloudCanal 是如何實現這個能力,以及作為用戶我們怎么比較好的使用這個能力。

技術點

結構遷移

CloudCanal 默認提供結構遷移,默認選擇 CollapsingMergeTree 作為表引擎,並增加一個默認字段 __cc_ck_sign,源主鍵作為 sortKey,如下示例:

 CREATE TABLE console.worker_stats
(
    `id` Int64,
    `gmt_create` DateTime,
    `worker_id` Int64,
    `cpu_stat` String,
    `mem_stat` String,
    `disk_stat` String,
    `__cc_ck_sign` Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(__cc_ck_sign)
ORDER BY id
SETTINGS index_granularity = 8192

ClickHouse 表引擎中,CollapsingMergeTree 和 VersionedCollapsingMergeTree 都能通過標記位按規則折疊數據,從而達到更新和刪除的效果。VersionedCollapsingMergeTree 相比 CollapsingMergeTree 優勢在於同一條數據的不同變更可以亂序寫入,但是 CloudCanal 選擇 CollapsingMergeTree 主要原因在於2點

    1. CloudCanal 中同一條記錄必定是按源庫變更順序寫入,不存在亂序情況
    1. 不需要維護 VersionedCollapsingMergeTree 中的 Version 字段(版本,也可以起其他名字)

所以 CloudCanal 選擇了 CollapsingMergeTree 作為默認表引擎。

寫數據

CloudCanal 寫數據主要包含全量和增量兩種,即單次搬遷存量數據和長期同步,兩者寫入略有不同。全量寫入對端主要工作是批量和多線程,因為 CloudCanal 結構遷移默認設置了標記位字段 __cc_ck_sign default 值為 1, 所以就不需要做特殊處理。

對於增量, CloudCanal 則需要做 3 件事情。

  • 轉換 Update、Delete 操作為 Insert
    這一步有兩件事情要做,第一件是按照操作類型,填充標記字段值,其中 Insert 和 Update 為 1 ,Delete 為 -1 ,第二件是將對應增量數據的前鏡像或者后鏡像填充到結果記錄中,以便后續 insert 寫入。
 for (CanalRowChange rowChange : rowChanges) {
            switch (rowChange.getEventType()) {
                case INSERT: {
                    for (CanalRowData rowData : rowChange.getRowDatasList()) {
                        rowData.getAfterColumnsList().add(nonDeleteCol);
                        records.add(rowData.getAfterColumnsList());
                    }

                    break;
                }
                case UPDATE: {
                    for (CanalRowData rowData : rowChange.getRowDatasList()) {
                        rowData.getBeforeColumnsList().add(deleteCol);
                        records.add(rowData.getBeforeColumnsList());

                        rowData.getAfterColumnsList().add(nonDeleteCol);
                        records.add(rowData.getAfterColumnsList());
                    }

                    break;
                }
                case DELETE: {
                    for (CanalRowData rowData : rowChange.getRowDatasList()) {
                        rowData.getBeforeColumnsList().add(deleteCol);
                        records.add(rowData.getBeforeColumnsList());
                    }

                    break;
                }
                default:
                    throw new CanalException("not supported event type,eventType:" + rowChange.getEventType());
            }
        }
  • 按表歸組
    因為 IUD 操作已全部轉換為 Insert, 且為全鏡像(所有字段都填充了值),所以可以按表歸組,然后批量寫入。即使單線程也能滿足大部分場景的同步性能要求。
protected Map<TableUnit, List<CanalRowChange>> groupByTable(IncrementMessage message) {
        Map<TableUnit, List<CanalRowChange>> data = new HashMap<>();
        for (ParsedEntry entry : message.getEntries()) {
            if (entry.getEntryType() == CanalEntryType.ROWDATA) {
                CanalRowChange rowChange = entry.getRowChange();
                if (!rowChange.isDdl()) {
                    List<CanalRowChange> changes = data.computeIfAbsent(new TableUnit(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()), k -> new ArrayList<>());
                    changes.add(rowChange);
                }
            }
        }

        return data;
    }
  • 並行寫入
    將按表歸組的數據使用並行執行框架執行,具體不詳述。

舉個"栗子"

  • 添加數據源
    1.jpg
  • 創建任務,選擇數據源和庫,並連接成功,點擊下一步
    2.jpg
  • 選擇數據同步,建議規格至少選擇 1 GB.目前 MySQL->ClickHouse 結構遷移自動過濾,所以選擇無效。點擊下一步
    3.jpg
  • 選擇表,默認 ClickHouse 上創建 CollapsingMergeTree 表引擎,並自動添加 __cc_ck_sign 折疊標記字段。點擊下一步
    4.jpg
  • 選擇字段,點擊下一步
    5.jpg
  • 創建任務
    6.jpg
  • 等待任務自動結構遷移、全量遷移、數據同步追上
    7.jpg
  • 造點 Insert、Update、Delete 負載
    8.jpg
  • 延遲追平狀態,停止負載
    9.jpg
  • 檢查源端 MySQL 表數據,以其中一張表為例
    10.jpg
  • 檢查對端 ClickHouse 表數據,不一致?!!
    11.jpg
  • 手動優化下表,數據一致。雖然可以等待 ClickHouse 自動優化,但是如果需要直接得到准確結果,可手動優化(注意:手動優化可能導致數據庫機器壓力過大)
    12.jpg

常見問題

我在ClickHouse上已經創建了表怎么辦?

目前比較建議直接使用 CloudCanal 自動結構遷移的方式來創建任務。

如果已建表為 CollapsingMergeTree 表引擎,請將標記位字段改成 __cc_ck_sign Int8 DEFAULT 1`,再創建任務(此時就不再自動結構遷移,而是使用已存在表)。

如果為其他表引擎,暫時不支持(主要是不支持增量能力,需要 CloudCanal 進一步探索)。

同步過去的數據什么時候合並?

當 CloudCanal 同步數據到 ClickHouse 時,ClickHouse 並不會實時合並數據,也沒有一致性可言,所以一般情況是等待合並,或者直接手動合並(造成機器高負載、高IO),optimize table worker_stats FINAL

DDL 怎么做?

目前 CloudCanal 還未支持到 ClickHouse 的 DDL 同步,產品實現上,目前是忽略的。所以如果做 DDL ,加字段建議對端先加,再加源端,減字段反之。

總結

本文簡要介紹了 CloudCanal 實現 MySQL(RDS) 到 ClickHouse 數據遷移同步的能力,具備一站式、數據實時特點,從技術點、例子、以及常見問題角度展開。文章如有錯誤,煩請大家勘誤,后續也歡迎大家試用,提供寶貴的意見和建議。
CloudCanal-免費好用的企業級數據同步工具,歡迎品鑒。
了解產品可以查看官方網站: http://www.clougence.com
CloudCanal社區:https://www.askcug.com/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM