Flink 實踐教程:入門(3):讀取 MySQL 數據


作者:騰訊雲流計算 Oceanus 團隊

 

流計算 Oceanus 簡介

流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。

本文將為您詳細介紹如何取 MySQL 數據,經過流計算 Oceanus 實時計算引擎分析,輸出數據到日志(Logger Sink)當中。

 

 

 

前置准備

創建 流計算 Oceanus 集群

進入流計算 Oceanus 控制台,點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔創建獨享集群

創建 Mysql 實例

進入MySQL 控制台,點擊【新建】。具體可參考官方文檔創建 MySQL 實例。然后在【數據庫管理】> 【參數設置】中設置參數 binlog_row_image=FULL,便於使用 CDC(Capture Data Change)特性,實現數據的變更實時捕獲。

!創建流計算 Oceanus 集群和 MySQL 實例時所選 VPC 必須是同一 VPC。

流計算 Oceanus 作業

1. 創建 Source

CREATE TABLE `MySQLSourceTable` (
  `id` INT,
  `name` VARCHAR,
  PRIMARY KEY (`id`) NOT ENFORCED  -- 如果要同步的數據庫表定義了主鍵, 則這里也需要定義
) WITH (
   'connector' = 'mysql-cdc',       -- 必須為 'mysql-cdc'
   'hostname' = '10.0.0.158',       -- 數據庫的 IP
   'port' = '3306',                 -- 數據庫的訪問端口
   'username' = 'root',             -- 數據庫訪問的用戶名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 權限)
   'password' = 'xxxxxxxxxx',       -- 數據庫訪問的密碼
   'database-name' = 'testdb',      -- 需要同步的數據庫
   'table-name' = 'student'         -- 需要同步的數據表名
);

2. 創建 Sink

CREATE TABLE CustomSink (
id INT,
name VARCHAR
) WITH (
   'connector' = 'logger',
   'print-identifier' = 'DebugData'
);

3. 編寫業務 SQL

INSERT INTO CustomSink
SELECT * FROM MySQLSourceTable;

4. 運行作業

點擊【保存】>【發布草稿】運行作業。查看 Flink UI  Taskmanger 日志,觀察全量數據是否正常打印到日志。

5. 驗證 MySQL-CDC 特性

在 MySQL 中新增一條數據,然后在 Flink UI Taskmanger 日志中觀察結果,觀察新增的數據是否正常打印到日志。

在 MySQL 中修改和刪除記錄同樣會更新到 Logger Sink中,並打印輸出。

 

總結

1、Mysql CDC 支持對 MySQL 數據庫的全量和增量讀取,並保證 Exactly Once 語義。MySQL CDC 底層使用了 Debezium 來做 CDC(Change Data Capture),其工作特性可參考數據庫 MySQL CDC  

2、輸入到 Logger Sink 的數據, 會通過日志打印出來,便於調試。Logger Jar 包下載地址:https://cloud.tencent.com/document/product/849/58713

 

 

關注“騰訊雲大數據”公眾號,技術交流、最新活動、服務專享一站Get~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM