使用binlog和canal從mysql實時抽取數據


來源地——https://blog.csdn.net/zjerryj/article/details/77152226

數據抽取是 ETL 流程的第一步。我們會將數據從 RDBMS 或日志服務器等外部系統抽取至數據倉庫,進行清洗、轉換、聚合等操作。在現代網站技術棧中,MySQL 是最常見的數據庫管理系統,我們會從多個不同的 MySQL 實例中抽取數據,存入一個中心節點,或直接進入 Hive。市面上已有多種成熟的、基於 SQL 查詢的抽取軟件,如著名的開源項目 Apache Sqoop,然而這些工具並不支持實時的數據抽取。MySQL Binlog 則是一種實時的數據流,用於主從節點之間的數據復制,我們可以利用它來進行數據抽取。借助阿里巴巴開源的 Canal 項目,我們能夠非常便捷地將 MySQL 中的數據抽取到任意目標存儲中。

 

Canal 的組成部分

簡單來說,Canal 會將自己偽裝成 MySQL 從節點(Slave),並從主節點(Master)獲取 Binlog,解析和貯存后供下游消費端使用。Canal 包含兩個組成部分:服務端和客戶端。服務端負責連接至不同的 MySQL 實例,並為每個實例維護一個事件消息隊列;客戶端則可以訂閱這些隊列中的數據變更事件,處理並存儲到數據倉庫中。下面我們來看如何快速搭建起一個 Canal 服務。

配置 MySQL 主節點

MySQL 默認沒有開啟 Binlog,因此我們需要對 my.cnf 文件做以下修改:

server-id = 1

log_bin = /path/to/mysql-bin.log

binlog_format = ROW

注意 binlog_format 必須設置為 ROW, 因為在 STATEMENT 或 MIXED 模式下, Binlog 只會記錄和傳輸 SQL 語句(以減少日志大小),而不包含具體數據,我們也就無法保存了。

從節點通過一個專門的賬號連接主節點,這個賬號需要擁有全局的 REPLICATION 權限。我們可以使用 GRANT 命令創建這樣的賬號:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT

ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

啟動 Canal 服務端

從 GitHub 項目發布頁中下載 Canal 服務端代碼(鏈接),配置文件在 conf 文件夾下,有以下目錄結構:

canal.deployer/conf/canal.properties

canal.deployer/conf/instanceA/instance.properties

canal.deployer/conf/instanceB/instance.properties

conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服務端監聽的端口。instanceA/instance.properties 則是各個實例的配置文件,主要的配置項有:

# slaveId 不能與 my.cnf 中的 server-id 項重復

canal.instance.mysql.slaveId = 1234

canal.instance.master.address = 127.0.0.1:3306

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

canal.instance.connectionCharset = UTF-8

# 訂閱實例中所有的數據庫和表

canal.instance.filter.regex = .*\\..*

執行 sh bin/startup.sh 命令開啟服務端,在日志文件 logs/example/example.log 中可以看到以下輸出:

Loading properties file from class path resource [canal.properties]

Loading properties file from class path resource [example/instance.properties]

start CannalInstance for 1-example

[destination = example , address = /127.0.0.1:3306 , EventParser] prepare to find start position just show master status

編寫 Canal 客戶端

從服務端消費變更消息時,我們需要創建一個 Canal 客戶端,指定需要訂閱的數據庫和表,並開啟輪詢。

首先,在項目中添加 com.alibaba.otter:canal.client 依賴項,構建 CanalConnector 實例:

CanalConnector connector = CanalConnectors.newSingleConnector(

        new InetSocketAddress("127.0.0.1", 11111), "example", "", "");

 

connector.connect();

connector.subscribe(".*\\..*");

 

while (true) {

    Message message = connector.getWithoutAck(100);

    long batchId = message.getId();

    if (batchId == -1 || message.getEntries().isEmpty()) {

        Thread.sleep(3000);

    } else {

        printEntries(message.getEntries());

        connector.ack(batchId);

    }

}

這段代碼和連接消息系統很相似。變更事件會批量發送過來,待處理完畢后我們可以 ACK 這一批次,從而避免消息丟失。

// printEntries

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

for (RowData rowData : rowChange.getRowDatasList()) {

    if (rowChange.getEventType() == EventType.INSERT) {

      printColumns(rowData.getAfterCollumnList());

    }

}

每一個 Entry 代表一組具有相同變更類型的數據列表,如 INSERT 類型、UPDATE、DELETE 等。每一行數據我們都可以獲取到各個字段的信息:

// printColumns

String line = columns.stream()

        .map(column -> column.getName() + "=" + column.getValue())

        .collect(Collectors.joining(","));

System.out.println(line);

完整代碼可以在 GitHub 中找到(鏈接)。

加載至數據倉庫

關系型數據庫與批量更新

若數據倉庫是基於關系型數據庫的,我們可以直接使用 REPLACE 語句將數據變更寫入目標表。其中需要注意的是寫入性能,在更新較頻繁的場景下,我們通常會緩存一段時間的數據,並批量更新至數據庫,如:

REPLACE INTO `user` (`id`, `name`, `age`, `updated`) VALUES

(1, 'Jerry', 30, '2017-08-12 16:00:00'),

(2, 'Mary', 28, '2017-08-12 17:00:00'),

(3, 'Tom', 36, '2017-08-12 18:00:00');

另一種方式是將數據變更寫入按分隔符分割的文本文件,並用 LOAD DATA 語句載入數據庫。這些文件也可以用在需要寫入 Hive 的場景中。不管使用哪一種方法,請一定注意要對字符串類型的字段進行轉義,避免導入時出錯。

基於 Hive 的數據倉庫

Hive 表保存在 HDFS 上,該文件系統不支持修改,因此我們需要一些額外工作來寫入數據變更。常用的方式包括:JOIN、Hive 事務、或改用 HBase。

數據可以歸類成基礎數據和增量數據。如昨日的 user 表是基礎數據,今日變更的行是增量數據。通過FULL OUTER JOIN,我們可以將基礎和增量數據合並成一張最新的數據表,並作為明天的基礎數據:

SELECT

  COALESCE(b.`id`, a.`id`) AS `id`

  ,COALESCE(b.`name`, a.`name`) AS `name`

  ,COALESCE(b.`age`, a.`age`) AS `age`

  ,COALESCE(b.`updated`, a.`updated`) AS `updated`

FROM dw_stage.`user` a

FULL OUTER JOIN (

  -- 增量數據會包含重復數據,因此需要選擇最新的那一條

  SELECT `id`, `name`, `age`, `updated`

  FROM (

    SELECT *, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated` DESC) AS `n`

    FROM dw_stage.`user_delta`

  ) b

  WHERE `n` = 1

) b

ON a.`id` = b.`id`;

Hive 0.13 引入了事務和 ACID 表,0.14 開始支持 INSERT、UPDATE、DELETE 語句,Hive 2.0.0 則又新增了Streaming Mutation API,用以通過編程的方式批量更新 Hive 表中的記錄。目前,ACID 表必須使用 ORC 文件格式進行存儲,且須按主鍵進行分桶(Bucket)。Hive 會將變更記錄保存在增量文件中,當 OrcInputFormat 讀取數據時會自動定位到最新的那條記錄。官方案例可以在這個鏈接中查看。

最后,我們可以使用 HBase 來實現表數據的更新,它是一種 KV 存儲系統,同樣基於 HDFS。HBase 的數據可以直接為 MapReduce 腳本使用,且 Hive 中可以創建外部映射表指向 HBase。更多信息請查看官方網站

初始化數據

數據抽取通常是按需進行的,在新增一張表時,數據源中可能已經有大量原始記錄了。常見的做法是手工將這批數據全量導入至目標表中,但我們也可以復用 Canal 這套機制來實現歷史數據的抽取。

首先,我們在數據源庫中創建一張輔助表:

CREATE TABLE `retl_buffer` (

  id BIGINT AUTO_INCREMENT PRIMARY KEY

  ,table_name VARCHAR(255)

  ,pk_value VARCHAR(255)

);

當需要全量抽取 user 表時,我們執行以下語句,將所有 user.id 寫入輔助表中:

INSERT INTO `retl_buffer` (`table_name`, `pk_value`)

SELECT 'user', `id` FROM `user`;

Canal 客戶端在處理到 retl_buffer 表的數據變更時,可以從中解析出表名和主鍵的值,直接反查數據源,將數據寫入目標表:

if ("retl_buffer".equals(entry.getHeader().getTableName())) {

    String tableName = rowData.getAfterColumns(1).getValue();

    String pkValue = rowData.getAfterColumns(2).getValue();

    System.out.println("SELECT * FROM " + tableName + " WHERE id = " + pkValue);

}

這一方法在阿里巴巴的另一個開源軟件 Otter 中使用。

Canal 高可用

  • Canal 服務端中的實例可以配置一個備用 MySQL,從而能夠在雙 Master 場景下自動選擇正在工作的數據源。注意兩台主庫都需要打開 log_slave_updates 選項。Canal 會使用自己的心跳機制(定期更新輔助表的記錄)來檢測主庫的存活。
  • Canal 自身也有 HA 配置,配合 Zookeeper,我們可以開啟多個 Canal 服務端,當某台服務器宕機時,客戶端可以從 ZK 中獲取新的服務端地址,繼續進行消費。更多信息可以參考 Canal AdminGuide

參考資料

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM