老劉是一名即將找工作的研二學生,寫博客一方面是總結大數據開發的知識點,一方面是希望能夠幫助伙伴讓自學從此不求人。由於老劉是自學大數據開發,博客中肯定會存在一些不足,還希望大家能夠批評指正,讓我們一起進步!
背景
大數據領域數據源有業務庫的數據,也有移動端埋點數據、服務器端產生的日志數據。我們在對數據進行采集時根據下游對數據的要求不同,我們可以使用不同的采集工具來進行。今天老劉給大家講的是同步mysql增量數據的工具Canal,本篇文章的大綱如下:
-
Canal 的概念 -
mysql 中主備復制實現原理 -
Canal 如何從 MySQL 中同步數據 -
Canal 的 HA 機制設計 -
各種數據同步解決方法的簡單總結
老劉爭取用這一篇文章讓大家直接上手 Canal 這個工具,不再花別的時間來學習。
mysql 主備復制實現原理
由於 Canal 是用來同步 mysql 中增量數據的,所以老劉先講 mysql 的主備復制原理,之后再講 Canal 的核心知識點。

根據這張圖,老劉把 mysql 的主備復制原理分解為如下流程:
-
主服務器首先必須啟動二進制日志 binlog,用來記錄任何修改了數據庫數據的事件。 -
主服務器將數據的改變記錄到二進制 binlog 日志。 -
從服務器會將主服務器的二進制日志復制到其本地的中繼日志(Relaylog)中。這一步細化的說就是首先從服務器會啟動一個工作線程 I/O 線程,I/O 線程會跟主庫建立一個普通的客戶單連接,然后在主服務器上啟動一個特殊的二進制轉儲(binlog dump)線程,這個 binlog dump 線程會讀取主服務器上二進制日志中的事件,然后向 I/O 線程發送二進制事件,並保存到從服務器上的中繼日志中。 -
從服務器啟動 SQL 線程,從中繼日志中讀取二進制日志,並且在從服務器本地會再執行一次數據修改操作,從而實現從服務器數據的更新。
那么 mysql 主備復制實現原理就講完了,大家看完這個流程,能不能猜到 Canal 的工作原理?
Canal 核心知識點
Canal 的工作原理
Canal 的工作原理就是它模擬 MySQL slave 的交互協議,把自己偽裝為 MySQL slave,向 MySQL master 發動 dump 協議。MySQL master 收到 dump 請求后,就會開始推送 binlog 給 Canal。最后 Canal 就會解析 binlog 對象。
Canal 概念
Canal,美[kəˈnæl],是這樣讀的,意思是水道/管道/渠道,主要用途就是用來同步 MySQL 中的增量數據(可以理解為實時數據),是阿里巴巴旗下的一款純 Java 開發的開源項目。
Canal 架構

server 代表一個 canal 運行實例,對應於一個 JVM。 instance 對應於一個數據隊列,1 個 canal server 對應 1..n 個 instance instance 下的子模塊:
-
EventParser:數據源接入,模擬 salve 協議和 master 進行交互,協議解析 -
EventSink:Parser 和 Store 鏈接器,進行數據過濾,加工,分發的工作 -
EventStore:數據存儲 -
MetaManager: 增量訂閱&消費信息管理器
到現在 Canal 的基本概念就講完了,那接下來就要講 Canal 如何同步 mysql 的增量數據。
Canal 同步 MySQL 增量數據
開啟 mysql binlog
我們用 Canal 同步 mysql 增量數據的前提是 mysql 的 binlog 是開啟的,阿里雲的 mysql 數據庫是默認開啟 binlog 的,但是如果我們是自己安裝的 mysql 需要手動開啟 binlog 日志功能。
先找到 mysql 的配置文件:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
這里有一個知識點是關於 binlog 的格式,老劉給大家講講。
binlog 的格式有三種:STATEMENT、ROW、MIXED
-
ROW 模式(一般就用它)
日志會記錄每一行數據被修改的形式,不會記錄執行 SQL 語句的上下文相關信息,只記錄要修改的數據,哪條數據被修改了,修改成了什么樣子,只有 value,不會有 SQL 多表關聯的情況。
優點:它僅僅只需要記錄哪條數據被修改了,修改成什么樣子了,所以它的日志內容會非常清楚地記錄下每一行數據修改的細節,非常容易理解。
缺點:ROW 模式下,特別是數據添加的情況下,所有執行的語句都會記錄到日志中,都將以每行記錄的修改來記錄,這樣會產生大量的日志內容。
-
STATEMENT 模式
每條會修改數據的 SQL 語句都會被記錄下來。
缺點:由於它是記錄的執行語句,所以,為了讓這些語句在 slave 端也能正確執行,那他還必須記錄每條語句在執行過程中的一些相關信息,也就是上下文信息,以保證所有語句在 slave 端被執行的時候能夠得到和在 master 端執行時候相同的結果。
但目前例如 step()函數在有些版本中就不能被正確復制,在存儲過程中使用了 last-insert-id()函數,可能會使 slave 和 master 上得到不一致的 id,就是會出現數據不一致的情況,ROW 模式下就沒有。
-
MIXED 模式
以上兩種模式都使用。
Canal 實時同步
-
首先我們要配置環境,在 conf/example/instance.properties 下:
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要修改成自己的數據庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要修改成自己的數據庫信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
其中,canal.instance.connectionCharset 代表數據庫的編碼方式對應到 java 中的編碼類型,比如 UTF-8,GBK,ISO-8859-1。
-
配置完后,就要啟動了
sh bin/startup.sh
關閉使用 bin/stop.sh
-
觀察日志
一般使用 cat 查看 canal/canal.log、example/example.log
-
啟動客戶端
在 IDEA 中業務代碼,mysql 中如果有增量數據就拉取過來,在 IDEA 控制台打印出來
在 pom.xml 文件中添加:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
添加客戶端代碼:
public class Demo {
public static void main(String[] args) {
//創建連接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
"example", "", "");
connector.connect();
//訂閱
connector.subscribe();
connector.rollback();
int batchSize = 1000;
int emptyCount = 0;
int totalEmptyCount = 100;
while (totalEmptyCount > emptyCount) {
Message msg = connector.getWithoutAck(batchSize);
long id = msg.getId();
List<CanalEntry.Entry> entries = msg.getEntries();
if(id == -1 || entries.size() == 0){
emptyCount++;
System.out.println("emptyCount : " + emptyCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
emptyCount = 0;
printEntry(entries);
}
connector.ack(id);
}
}
// batch -> entries -> rowchange - rowdata -> cols
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+" __ " +
entry.getHeader().getSchemaName() + " __ " + eventType);
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData : rowDatasList){
for(CanalEntry.Column column: rowData.getAfterColumnsList()){
System.out.println(column.getName() + " - " +
column.getValue() + " - " +
column.getUpdated());
}
}
}
}
}
-
在mysql中寫數據,客戶端就會把增量數據打印到控制台。
Canal 的 HA 機制設計
在大數據領域很多框架都會有 HA 機制,Canal 的 HA 分為兩部分,Canal server 和 Canal client 分別有對應的 HA 實現:
-
canal server:為了減少對 mysql dump 的請求,不同 server 上的 instance 要求同一時間只能有一個處於 running,其他的處於 standby 狀態。 -
canal client:為了保證有序性,一份 instance 同一時間只能由一個 canal client 進行 get/ack/rollback 操作,否則客戶端接收無法保證有序。
整個 HA 機制的控制主要是依賴了 ZooKeeper 的幾個特性,ZooKeeper 這里就不講了。
Canal Server:
-
canal server 要啟動某個 canal instance 時都先向 ZooKeeper 進行一次嘗試啟動判斷(創建 EPHEMERAL 節點,誰創建成功就允許誰啟動)。 -
創建 ZooKeeper 節點成功后,對應的 canal server 就啟動對應的 canal instance,沒有創建成功的 canal instance 就會處於 standby 狀態。 -
一旦 ZooKeeper 發現 canal server 創建的節點消失后,立即通知其他的 canal server 再次進行步驟 1 的操作,重新選出一個 canal server 啟動 instance。 -
canal client 每次進行 connect 時,會首先向 ZooKeeper 詢問當前是誰啟動了 canal instance,然后和其建立連接,一旦連接不可用,會重新嘗試 connect。 -
canal client 的方式和 canal server 方式類似,也是利用 ZooKeeper 的搶占 EPHEMERAL 節點的方式進行控制。
Canal HA 的配置,並把數據實時同步到 kafka 中。
-
修改 conf/canal.properties 文件
canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
canal.serverMode = kafka
canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
-
配置 conf/example/example.instance
canal.instance.mysql.slaveId = 790 /兩台canal server的slaveID唯一
canal.mq.topic = canal_log //指定將數據發送到kafka的topic
數據同步方案總結
講完了 Canal 工具,現在給大家簡單總結下目前常見的數據采集工具,不會涉及架構知識,只是簡單總結,讓大家有個印象。
常見的數據采集工具有:DataX、Flume、Canal、Sqoop、LogStash 等。
DataX (處理離線數據)
DataX 是阿里巴巴開源的一個異構數據源離線同步工具,異構數據源離線同步指的是將源端數據同步到目的端,但是端與端的數據源類型種類繁多,在沒有 DataX 之前,端與端的鏈路將組成一個復雜的網狀結構,非常零散無法把同步核心邏輯抽象出來。

為了解決異構數據源同步問題,DataX 將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX 作為中間傳輸載體負責連接各種數據源。
所以,當需要接入一個新的數據源的時候,只需要將此數據源對接到 DataX,就可以跟已有的數據源做到無縫數據同步。

DataX本身作為離線數據同步框架,采用Framework+plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
-
Reader: 它為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。 -
Writer: 它為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。 -
Framework:它用於連接Reader和Writer,作為兩者的數據傳輸通道,並處理緩沖、並發、數據轉換等問題。
DataX的核心架構如下圖:

核心模塊介紹:
-
DataX完成單個數據同步的作業,我們把它稱之為Job,DataX接收到一個Job之后,將啟動一個進程來完成整個作業同步過程。 -
DataX Job啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。 -
切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。 -
每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader->Channel->Writer的線程來完成任務同步工作。 -
DataX作業運行完成之后,Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出。
Flume(處理實時數據)

Flume主要應用的場景是同步日志數據,主要包含三個組件:Source、Channel、Sink。
Flume最大的優點就是官網提供了豐富的Source、Channel、Sink,根據不同的業務需求,我們可以在官網查找相關配置。另外,Flume還提供了自定義這些組件的接口。
Logstash(處理離線數據)

Logstash就是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上過濾網,Logstash提供了很多功能強大的過濾網來滿足各種應用場景。
Logstash是由JRuby編寫,使用基於消息的簡單架構,在JVM上運行。在管道內的數據流稱之為event,它分為inputs階段、filters階段、outputs階段。
Sqoop(處理離線數據)

Sqoop是Hadoop和關系型數據庫之間傳送數據的一種工具,它是用來從關系型數據庫如MySQL到Hadoop的HDFS從Hadoop文件系統導出數據到關系型數據庫。Sqoop底層用的還是MapReducer,用的時候一定要注意數據傾斜。
總結
老劉本篇文章主要講述了Canal工具的核心知識點及其數據采集工具的對比,其中數據采集工具只是大致講了講概念和應用,目的也是讓大家有個印象。老劉敢做保證看完這篇文章基本等於入門,剩下的就是練習了。
好啦,同步mysql增量數據的工具Canal的內容就講完了,盡管當前水平可能不及各位大佬,但老劉會努力變得更加優秀,讓各位小伙伴自學從此不求人!
如果有相關問題,聯系公眾號:努力的老劉。文章都看到這了,點贊關注支持一波!