Flink從1.13版本開始支持在SQL Client從savepoint恢復作業。flink-savepoint介紹
接下來我們從Flink SQL Client構建一個mysql cdc數據經kafka入hudi數據湖的例子。整體流程如下:

在上述第二步中,我們通過手工停止kafka→hudi的Flink任務,然后在Flink SQL Client從savepoint進行恢復。
下述工作類似於Flink SQL Client實戰CDC數據入湖只是本文的flink版本為1.13.1,可參考其完成本文驗證。
環境依賴
hadoop 3.2.0 zookeeper 3.6.3 kafka 2.8.0 mysql 5.7.35 flink 1.13.1-scala_2.12 flink cdc 1.4 hudi 0.10.0-SNAPSHOT datafaker 0.7.6
操作指南
使用datafaker將測試數據導入mysql
在數據庫中新建stu8表
mysql -u root -p create database test; use test; create table stu8 ( id int unsigned auto_increment primary key COMMENT '自增id', name varchar(20) not null comment '學生名字', school varchar(20) not null comment '學校名字', nickname varchar(20) not null comment '學生小名', age int not null comment '學生年齡', score decimal(4,2) not null comment '成績', class_num int not null comment '班級人數', phone bigint not null comment '電話號碼', email varchar(64) comment '家庭網絡郵箱', ip varchar(32) comment 'IP地址' ) engine=InnoDB default charset=utf8;
新建meta.txt文件,文件內容為:
id||int||自增id[:inc(id,1)] name||varchar(20)||學生名字 school||varchar(20)||學校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)] nickname||varchar(20)||學生小名[:enum(tom,tony,mick,rich,jasper)] age||int||學生年齡[:age] score||decimal(4,2)||成績[:decimal(4,2,1)] class_num||int||班級人數[:int(10, 100)] phone||bigint||電話號碼[:phone_number] email||varchar(64)||家庭網絡郵箱[:email] ip||varchar(32)||IP地址[:ipv4]
生成1000000條數據並寫入到mysql中的test.stu8表(將數據設置盡量大,讓寫入hudi的任務能夠不斷進行)
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt
hudi、flink-mysql-cdc、flink-kafka相關jar包下載
本文提供編譯好的hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar,如果你想自己編譯hudi那么直接clone master分支進行編譯即可。(注意指定hadoop版本)
將jar包下載到flink的lib目錄下
cd flink-1.13.1/lib wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar
在yarn上啟動flink session集群
首先確保已經配置好HADOOP_CLASSPATH,對於開源版本hadoop3.2.0,可通過如下方式設置:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*
flink需要開啟checkpoint,和配置savepoint目錄,修改flink-conf.yaml配置文件
execution.checkpointing.interval: 150000ms state.backend: rocksdb state.checkpoints.dir: hdfs://hadoop:9000/flink-chk state.backend.rocksdb.localdir: /tmp/rocksdb state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints
啟動flink session集群
cd flink-1.13.1 bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -d
啟動flink sql client
cd flink-1.13.1 bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shell
flink讀取mysql binlog並寫入kafka
創建mysql源表
create table stu8_binlog( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'hadoop', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'database-name' = 'test', 'table-name' = 'stu8' );
創建kafka目標表
create table stu8_binlog_sink_kafka( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) with ( 'connector' = 'kafka' ,'topic' = 'cdc_mysql_test_stu8_sink' ,'properties.zookeeper.connect' = 'hadoop1:2181' ,'properties.bootstrap.servers' = 'hadoop1:9092' ,'format' = 'debezium-json' );
創建任務將mysql binlog日志寫入kafka
insert into stu8_binlog_sink_kafka select * from stu8_binlog;
flink讀取kafka數據並寫入hudi數據湖
創建kafka源表
create table stu8_binlog_source_kafka( id bigint not null, name string, school string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string ) with ( 'connector' = 'kafka', 'topic' = 'cdc_mysql_test_stu8_sink', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'properties.group.id' = 'testGroup' );
創建hudi目標表
create table stu8_binlog_sink_hudi( id bigint not null, name string, `school` string, nickname string, age int not null, score decimal(4,2) not null, class_num int not null, phone bigint not null, email string, ip string, primary key (id) not enforced ) partitioned by (`school`) with ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi', 'table.type' = 'MERGE_ON_READ', 'write.precombine.field' = 'school' );
創建任務將kafka數據寫入到hudi中
insert into stu8_binlog_sink_hudi select * from stu8_binlog_source_kafka;
待任務運行一段時間后,我們手動保存hudi作業並停止任務
bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001
從savepoint恢復任務:(在Flink SQL Client執行)
SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb
insert into stu8_binlog_sink_hudi select * from stu8_binlog_source_kafka;
可以看到該任務從上述檢查點恢復:

原文鏈接:https://blog.csdn.net/weixin_39636364/article/details/120652618
