Flink sql-client save checkpoint 測試


一、測試環境准備

1、MySQL環境

version:5.7.34

IP:192.168.124.44

TAB:company、products、result

company

products

 result

2、flink

version:flink-1.13.2

IP:192.168.124.48

TAB:

-- creates a mysql cdc table source
--同步products表
注意:建表的時候需要設置主鍵,可以MySQL結構設置,否則會報錯,或者通過修改參數,可以不加主鍵

CREATE TABLE products (
id INT NOT NULL PRIMARY key ,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.124.44',
'port' = '3306',
'username' = 'myflink',
'password' = 'sg123456A@',
'database-name' = 'inventory',
'table-name' = 'products'
);

--同步company表

CREATE TABLE company(
id INT NOT NULL PRIMARY key ,
company STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.124.44',
'port' = '3306',
'username' = 'myflink',
'password' = 'sg123456A@',
'database-name' = 'inventory',
'table-name' = 'company'
);


CREATE TABLE mysql_result (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
company STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.124.44:3306/inventory',
'username' = 'root',
'password' = 'sg123456',
'table-name' = 'result'
);

 

insert into mysql_result (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from products a
left join company b
on a.id = b.id;

二、編輯flink sql job 初始化配置文件與DML文件

1、初始化配置文件

初始化配置文件可以包含多個job的初始化數據DDL信息

[root@flinkdb01 flinksqljob]# cat myjob-ddl
--創建測試DB

create database inventory;

use inventory;

-- creates a mysql cdc table source
--同步products表
CREATE TABLE products (
id INT NOT NULL PRIMARY key ,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.2.100',
'port' = '3306',
'username' = 'root',
'password' = 'sg123456',
'database-name' = 'inventory',
'table-name' = 'products'
);

--同步company表
CREATE TABLE company(
id INT NOT NULL PRIMARY key ,
company STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.2.100',
'port' = '3306',
'username' = 'root',
'password' = 'sg123456',
'database-name' = 'inventory',
'table-name' = 'company'
);

2、job dml文件

[root@flinkdb01 flinksqljob]# cat myjob-dml

-- set sync mode
SET 'table.dml-sync' = 'true';

-- set the job name
SET pipeline.name = SqlJob_mysql_result;

-- set the queue that the job submit to
--SET 'yarn.application.queue' = 'root';

-- set the job parallism
--SET 'parallism.default' = '100';

-- restore from the specific savepoint path

-- 注意:在初始化job的DML文件中不需要設置此選項,此選項是job失敗/停止后,從某個checkpoint點回復job所用
--SET 'execution.savepoint.path' = '/u01/soft/flink/flinkjob/flinksqljob/checkpoint/b2f2b0dda89bfb41df57d939479d7786/chk-407';    

--設置savepoint 時間
--set 'execution.checkpointing.interval' = '2sec';


insert into mysql_result01 (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from products a
left join company b
on a.id = b.id;

三、開啟checkpoint 與 savepoint

1、編輯flink-conf.yaml文件

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
execution.checkpointing.interval: 5000  ##單位毫秒,checkpoint時間間隔
state.checkpoints.num-retained: 20        ##單位個,保存checkpoint的個數
execution.checkpointing.mode: EXACTLY_ONCE   
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
#state.backend: rocksdb

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
state.backend: filesystem   #checkpoint 保存模式

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: file:///u01/soft/flink/flinkjob/flinksqljob/checkpoint  ##checkpoint 保存路徑,需要使用URL方式


# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.savepoints.dir: file:///u01/soft/flink/flinkjob/flinksqljob/checkpoint ##savepoint保存路徑,需要使用URL方式,應該已經合並到checkpoint中,在實驗中設置了,也沒生成(現階段

flink checkpoint 需要手動觸發)

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
state.backend.incremental: true

注意:checkpoint路徑必須設置為URL模式,否則運行job時,會報錯

2、重啟flink使得配置生效

systemctl restart flink.service

systemctl status flink.service

四、測試checkpoint恢復

1、初始化DDL與DML JOB

[root@flinkdb01 flinksqljob]# flinksql -i myjob-ddl -f myjob-dml

No default environment specified.
Searching for '/u01/soft/flink/flink-1.13.2/conf/sql-client-defaults.yaml'...not found.
Successfully initialized from sql script: file:/u01/soft/flink/flinkjob/flinksqljob/myjob-ddl
[INFO] Executing SQL from file.
Flink SQL>

SET pipeline.name = SqlJob_mysql_result;
[INFO] Session property has been set.
Flink SQL>

insert into mysql_result (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from products a
left join company b
on a.id = b.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f39957c3386e1e943f50ac16d4e3f809


Shutting down the session...
done.
[root@flinkdb01 flinksqljob]#

2、查看JOB

1)終端查看job

[root@flinkdb01 flinksqljob]# flink list

 2)web查看job

3、查看JOB的checkpoint

4、cancel DML JOB

5、從checkpoint恢復

1)查看cancel后的checkpoint

可以看到最后一個checkpoint是:/u01/soft/flink/flinkjob/flinksqljob/checkpoint/f39957c3386e1e943f50ac16d4e3f809/chk-59

 

2)從最后的checkpoint恢復

配置用於回復的DML文件

[root@flinkdb01 flinksqljob]# cat myjob-dml-restore
-- set sync mode
--SET 'table.dml-sync' = 'true';

-- set the job name
--SET pipeline.name = SqlJob_mysql_result;

-- set the queue that the job submit to
--SET 'yarn.application.queue' = 'root';

-- set the job parallism
--SET 'parallism.default' = '100';

-- restore from the specific savepoint path
SET 'execution.savepoint.path' = '/u01/soft/flink/flinkjob/flinksqljob/checkpoint/f39957c3386e1e943f50ac16d4e3f809/chk-59';

--設置savepoint 時間
--set 'execution.checkpointing.interval' = '2sec';


insert into mysql_result (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from products a
left join company b
on a.id = b.id;

3)恢復job

[root@flinkdb01 flinksqljob]# flinksql -i myjob-ddl -f myjob-dml-restore
No default environment specified.
Searching for '/u01/soft/flink/flink-1.13.2/conf/sql-client-defaults.yaml'...not found.
Successfully initialized from sql script: file:/u01/soft/flink/flinkjob/flinksqljob/myjob-ddl
[INFO] Executing SQL from file.
Flink SQL>

SET 'execution.savepoint.path' = '/u01/soft/flink/flinkjob/flinksqljob/checkpoint/f39957c3386e1e943f50ac16d4e3f809/chk-59';
[INFO] Session property has been set.
Flink SQL>

insert into mysql_result (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from products a
left join company b
on a.id = b.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f6f481776a2f24fbb9d050ea150e77cf


Shutting down the session...
done.

4)查看job恢復情況

五、測試savepoint恢復

1、手動生成savepoint

2、cancel job

3、從savepoint 恢復

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM