Flink 編譯 1.14 版本的 cdc connector
Flink CDC Connectors 是一組用於 Apache Flink 的源連接器,使用變更數據捕獲 (CDC) 從不同的數據庫中獲取變更。 Flink CDC 連接器集成了 Debezium 作為引擎來捕獲數據變化。 所以它可以充分發揮 Debezium 的能力。
作為 Flink 最火的 connector 之一,從一開源就火爆全場,從最開始的 Mysql、PostgreSQL,到現在的 MongoDB、Oracle、SqlServer,充分滿足用戶的多種數據源同步需求。
特別是 Flink cdc connector 2.0 發布,穩定性大幅提升(動態分片,初始化階段支持checkpoint)、功能增強(無鎖初始化)
What's Flink CDC:
從 FLink 1.14 發布,支持部分算子是 finish 狀態的 checkpoint,我的 Flink 版本就升級到 Flink 1.14(經常有流表關聯的任務,表的數據萬年不更新)
問題是 Flink cdc connector 只支持 Flink 1.13.x,這樣我的任務就需要在兩個 Flink 版本切換,有點煩
剛好最近重新弄 cdc 的任務,就嘗試了把 flink cdc connector 的 flink 版本換成 1.14.3
准備
從 github 上 fork https://github.com/ververica/flink-cdc-connectors 到自己的倉庫,然后 clone 下來,進入目錄
venn@venn git % git clone git@github.com:springMoon/flink-cdc-connectors.git
Cloning into 'flink-cdc-connectors'...
remote: Enumerating objects: 6717, done.
remote: Counting objects: 100% (1056/1056), done.
remote: Compressing objects: 100% (220/220), done.
remote: Total 6717 (delta 855), reused 892 (delta 795), pack-reused 5661
Receiving objects: 100% (6717/6717), 10.21 MiB | 3.36 MiB/s, done.
Resolving deltas: 100% (2502/2502), done.
venn@venn git % cd flink-cdc-connectors
venn@venn flink-cdc-connectors % ls
LICENSE flink-connector-debezium flink-connector-test-util flink-sql-connector-sqlserver-cdc
NOTICE flink-connector-mongodb-cdc flink-format-changelog-json pom.xml
README.md flink-connector-mysql-cdc flink-sql-connector-mongodb-cdc tools
azure-pipelines.yml flink-connector-oracle-cdc flink-sql-connector-mysql-cdc
docs flink-connector-postgres-cdc flink-sql-connector-oracle-cdc
flink-cdc-e2e-tests flink-connector-sqlserver-cdc flink-sql-connector-postgres-cdc
venn@venn flink-cdc-connectors %
導入項目 到 Idea
修改 flink 和 scala 版本
<flink.version>1.14.3</flink.version>
<scala.binary.version>2.12</scala.binary.version>
- scala 版本選的是 2.12
修改 blink 依賴
flink 1.14 版本以后,之前版本 blink-planner 轉正為 flink 唯一的 planner,對於的依賴包的名字也從:flink-table-planner-blink -> flink-table-planner,flink-table-runtime-blink -> flink-table-runtime
所以需要修改 flink-table-planner-blink 和 flink-table-runtime-blink 的包名
- 注:只需要 mysql 的 cdc,所以把 pg、oracle、MongoDB、sqlServer 的代碼都刪除,flink-cdc-e2e-tests 也刪除了,代碼就剩下這些:
venn@venn flink-cdc-connectors % ls
LICENSE azure-pipelines.yml flink-connector-debezium flink-format-changelog-json tools
NOTICE docs flink-connector-mysql-cdc flink-sql-connector-mysql-cdc
README.md flink-cdc-connectors.iml flink-connector-test-util pom.xml
venn@venn flink-cdc-connectors %
修改 flink-json 包的修改
Flink 1.14 中原來的: org.apache.flink.formats.json.JsonOptions 改為: org.apache.flink.formats.json.JsonFormatOptions、JsonFormatOptionsUtil
- 注: 還有兩個 check style 的問題,這里就不細寫了
修改 flink-shaded-guava 版本
修改到這里,就能編譯成功了,但是在使用的時候會遇到 flink-shaded-guava 包沖突的問題。
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder
at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.<init>(SnapshotSplitReader.java:81)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:128)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
flink 1.14 和 1.13 使用的 flink-shaded-guava 版本不一樣,兩個版本不兼容,需要修改 cdc 中的 flink-shaded-guava 版本為 flink 1.14 依賴的版本
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>18.0-13.0</version>
</dependency>
改為:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-14.0</version>
</dependency>
修改對應包中對 flink-shaded-guava 類的應用
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project flink-connector-debezium: Compilation failure: Compilation failure:
[ERROR] /Users/venn/git/flink-cdc-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java:[41,73] 程序包org.apache.flink.shaded.guava18.com.google.common.util.concurrent不存在
[ERROR] /Users/venn/git/flink-cdc-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java:[219,21] 找不到符號
[ERROR] 符號: 類 ThreadFactoryBuilder
[ERROR] 位置: 類 com.ververica.cdc.debezium.DebeziumSourceFunction<T>
刪除應用的 org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder 改為 : import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
編譯
venn@venn flink-cdc-connectors % mvn clean install -DskipTests
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO]
[INFO] flink-cdc-connectors
[INFO] flink-connector-debezium
[INFO] flink-connector-test-util
[INFO] flink-connector-mysql-cdc
[INFO] flink-sql-connector-mysql-cdc
[INFO] flink-format-changelog-json
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building flink-cdc-connectors 2.2-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
....忽略一萬行
tory/com/ververica/flink-format-changelog-json/2.2-SNAPSHOT/flink-format-changelog-json-2.2-SNAPSHOT-tests.jar
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] flink-cdc-connectors ............................... SUCCESS [ 7.295 s]
[INFO] flink-connector-debezium ........................... SUCCESS [ 17.599 s]
[INFO] flink-connector-test-util .......................... SUCCESS [ 3.390 s]
[INFO] flink-connector-mysql-cdc .......................... SUCCESS [ 10.641 s]
[INFO] flink-sql-connector-mysql-cdc ...................... SUCCESS [ 7.478 s]
[INFO] flink-format-changelog-json ........................ SUCCESS [ 0.761 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 47.450 s
[INFO] Finished at: 2022-03-01T17:09:57+08:00
[INFO] Final Memory: 124M/2872M
[INFO] ------------------------------------------------------------------------
venn@venn flink-cdc-connectors %
sqlSubmit 使用
修改 sqlSubmit pom.xml 的 flink cdc connector 版本
<flink.cdc.version>2.2-SNAPSHOT</flink.cdc.version>
再來個 cdc 的 SQL 任務
-- mysql cdc to print
-- creates a mysql table source
CREATE TABLE t_user_log (
id bigint
,user_id bigint
,item_id bigint
,category_id bigint
,behavior varchar
,ts timestamp(3)
,proc_time as PROCTIME()
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc'
,'hostname' = 'localhost'
,'port' = '3306'
,'username' = 'root'
,'password' = '123456'
,'database-name' = 'venn'
,'table-name' = 'user_log'
,'server-id' = '123456789'
,'scan.startup.mode' = 'initial'
-- ,'scan.startup.mode' = 'specific-offset'
-- ,'scan.startup.specific-offset.file' = 'mysql-bin.000001'
-- ,'scan.startup.specific-offset.pos' = '1'
);
-- kafka sink
drop table if exists t_user_log_sink;
CREATE TABLE t_user_log_sink (
id bigint
,user_id bigint
,item_id bigint
,category_id bigint
,behavior varchar
,ts timestamp(3)
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'print'
);
insert into t_user_log_sink
select id, user_id, item_id, category_id, behavior, ts
from t_user_log;
啟動任務
mysql-cdc 的 "scan.startup.mode" = "initial" ,啟動的時候初始化全量數據,新數據回從 binlog 解析
+I[2, 2, 100, 10, pv, 2022-03-01T09:19:43]
+I[3, 3, 100, 10, pv, 2022-03-01T09:19:45]
+I[1, 1, 100, 10, pv, 2022-03-01T09:19:40]
+I[4, 4, 100, 10, pv, 2022-03-01T09:19:46]
+I[5, 5, 100, 10, pv, 2022-03-01T09:19:48]
mysql 驅動版本沖突
開始我用的 mysql 版本是 8.0.27,有如下版本,定位到是 mysql 驅動包沖突了,修改 為 flink cdc connector 依賴的 8.0.21 就可以了
2022-03-01 17:28:51,970 WARN - Source: TableSourceScan(table=[[default_catalog, default_database, t_user_log]], fields=[id, user_id, item_id, category_id, behavior, ts]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_user_log_sink], fields=[id, user_id, item_id, category_id, behavior, ts]) (1/1)#2 (5d66100fdfcd37d688545fc52d60f9e5) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoSuchMethodError: com.mysql.cj.CharsetMapping.getJavaEncodingForMysqlCharset(Ljava/lang/String;)Ljava/lang/String;
at io.debezium.connector.mysql.MySqlValueConverters.charsetFor(MySqlValueConverters.java:331)
at io.debezium.connector.mysql.MySqlValueConverters.converter(MySqlValueConverters.java:298)
at io.debezium.relational.TableSchemaBuilder.createValueConverterFor(TableSchemaBuilder.java:400)
at io.debezium.relational.TableSchemaBuilder.convertersForColumns(TableSchemaBuilder.java:321)
at io.debezium.relational.TableSchemaBuilder.createValueGenerator(TableSchemaBuilder.java:246)
at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:138)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:52)
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.validateAndLoadDatabaseHistory(StatefulTaskContext.java:162)
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:114)
at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:92)
at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.submitSplit(SnapshotSplitReader.java:63)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:163)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:73)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
jar 包: flink-connector-mysql-cdc-2.2-SNAPSHOT.jar 包
完整示例參考: github sqlSubmit
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文