作者:騰訊雲流計算 Oceanus 團隊
流計算 Oceanus 簡介
流計算 Oceanus 是大數據產品生態體系的實時化分析利器,是基於 Apache Flink 構建的具備一站開發、無縫連接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大數據分析平台。流計算 Oceanus 以實現企業數據價值最大化為目標,加速企業實時化數字化的建設進程。本文將為您詳細介紹如何使用 MySQL 接入數據,經過流計算 Oceanus 對數據進行處理分析(示例中采用小寫轉換函數對name字段進行了小寫轉換),最終將處理好的數據存入 Elasticsearch 中 。
前置准備
1. MySQL 集群准備
1.1 新建 MySQL 集群進入 MySQL 控制台[1],點擊左上方【新建】創建集群。具體可參考官方文檔 創建 mysql 實例[2]。在【數據庫管理】> 【參數設置】中設置參數 binlog_row_image=FULL,便於使用 CDC(Capture Data Change)特性,實現數據的變更實時捕獲。
1.2 准備數據
首先創建 testdb 庫,並在 testdb 庫中創建用戶 user 表,並插入數據。user 表結構:
字段名 | 類型 | 含義 |
user_id | int | 用戶ID |
user_name | varchar(50) | 用戶名 |
create_time | timestamp | 創建時間 |
在表中插入2條數據。
INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1001, '小明', '2021-10-01 00:00:00'); INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1002, 'TONY', '2021-10-02 00:00:00');
1.3 設置參數
點擊實例 ID,在實例詳情頁面點擊【數據庫管理】進入【參數設置】面板,設置binlog_row_image=FULL
來開啟數據庫變化的同步。
通過 MySQL 集成數據到流計算 Oceanus (Flink) 集群,可以使用 flink-connector-jdbc 或者 flink-connector-mysq-cdc。使用 MySQL-cdc 特性時,flink-connector-mysq-cdc 連接器需要設置 MySQL 數據庫的參數 binlog_row_image=FULL。
2. 創建流計算 Oceanus 集群
進入流計算 Oceanus 控制台[3],點擊左側【集群管理】,點擊左上方【創建集群】,具體可參考流計算 Oceanus 官方文檔創建獨享集群[4]。
創建流計算 Oceanus 集群和 MySQL 集群時所選 VPC 必須是同一 VPC。
3. 創建 Elasticsearch 集群
進入 Elasticsearch 控制台[5],點擊左上方【新建】,創建 Elasticsearch 實例,具體操作請訪問創建 Elasticsearch 集群[6]。
創建 ES 集群和流計算 Oceanus 集群時所選私有網絡 VPC 必須是同一 VPC。
流計算 Oceanus 作業
1. 創建 Source
CREATE TABLE `user_source` ( `user_id` int, `user_name` varchar(50), PRIMARY KEY (`user_id`) NOT ENFORCED -- 如果要同步的數據庫表定義了主鍵, 則這里也需要定義 ) WITH ( 'connector' = 'mysql-cdc', -- 必須為 'mysql-cdc' 'hostname' = '10.0.0.158', -- 數據庫的 IP 'port' = '3306', -- 數據庫的訪問端口 'username' = 'root', -- 數據庫訪問的用戶名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 權限) 'password' = 'yourpassword', -- 數據庫訪問的密碼 'database-name' = 'testdb', -- 需要同步的數據庫 'table-name' = 'user' -- 需要同步的數據表名 );
2. 創建 Sink
-- Elasticsearch 只能作為數據目的表(Sink)寫入 -- 參見 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector CREATE TABLE es_sink ( `user_id` INT, `user_name` VARCHAR ) WITH ( 'connector.type' = 'elasticsearch', -- 輸出到 Elasticsearch 'connector.version' = '6', -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 'connector.hosts' = 'http://10.0.0.175:9200', 'connector.index' = 'User', 'connector.document-type' = 'user', 'connector.username' = 'elastic', 'connector.password' = 'yourpassword', 'update-mode' = 'upsert', -- 捕捉數據庫變化時,需使用 'upsert' 模式 'connector.key-delimiter' = '$', -- 可選參數, 復合主鍵的連接字符 (默認是 _ 符號) 'connector.key-null-literal' = 'n/a', -- 主鍵為 null 時的替代字符串,默認是 'null' 'connector.connection-max-retry-timeout' = '300', -- 每次請求的最大超時時間 (ms) 'format.type' = 'json' -- 輸出數據格式, 目前只支持 'json' );
3. 編寫業務 SQL
insert into es_sink ( select user_id, LOWER(user_name) -- LOWER()函數會將用戶名轉換為小寫 from user_source );
4. 選擇 Connector
點擊【保存】>【發布草稿】運行作業。
請根據實際購買的 Elasticsearch 版本選擇對應的 Connector ,1.13 版本之后無需選擇可自動匹配 Connector。
5. 數據查詢
進入 Elasticsearch 控制台[5],點擊之前購買的 Elasticsearch 實例,點擊右上角【Kibana】,進入 Kibana 查詢數據。具體查詢方法請參考通過 Kibana 訪問集群[7]。
總結
本示例用 MySQL 連接器持續集成數據庫數據變化記錄,經過流計算 Oceanus 實現最基礎的數據轉換功能,最后 Sink 到Elasticsearch 中,用戶無需提前在 Elasticsearch 中創建索引。另外,ES 作為Source/Sink , 使用時間戳 timestamp 類型字段時長度需指定,如:timestamp(3)
參考閱讀
[1]: MySQL 控制台:https://console.cloud.tencent.com/cdb
[2]: 創建 mysql 實例:https://cloud.tencent.com/document/product/236/46433
[3]: 流計算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[4]: 創建 Oceanus 獨享集群:https://cloud.tencent.com/document/product/849/48298
[5]: Elasticsearch 控制台:https://console.cloud.tencent.com/es
[6]: 創建 Elasticsearch 集群:https://cloud.tencent.com/document/product/845/19536
[7]: 通過 Kibana 訪問集群:https://cloud.tencent.com/document/product/845/19541

關注“騰訊雲大數據”公眾號,技術交流、最新活動、服務專享一站 Get~
