官網參考:
常見問題總結:
問題一:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.order_result'. Table options are: 'connector'='jdbc' 'driver'='com.mysql.jdbc.Driver' 'password'='xxxxx' 'sink.buffer-flush.interval'='2s' 'sink.buffer-flush.max-rows'='200' 'table-name'='order_result' 'url'='jdbc:mysql://xx.xxx.xx.xxx:3306/fk_test' 'username'='root' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:166) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:362) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:220) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) at flink.cdc.OrderInfo.main(OrderInfo.java:83) Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:385) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:372) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:159) ... 18 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem mysql-cdc print at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:245) at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:382) ... 20 more Process finished with exit code 1
問題二:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/table/api/bridge/java/StreamTableEnvironment at flink.cdc.OrderInfo.main(OrderInfo.java:11) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.bridge.java.StreamTableEnvironment at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 1 more
問題三:
Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.validatePrimaryKey(JdbcDynamicTableSink.java:72) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink.getChangelogMode(JdbcDynamicTableSink.java:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:124) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) at flink.cdc.OrderInfo.main(OrderInfo.java:87)
問題四待續。。
參考:
https://blog.lixuemin.com/2020/12/11/flink/Flink-CDC%E8%B8%A9%E5%9D%91%E9%9B%86%E5%90%88/ (Flink CDC踩坑集合)
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/
JDBC SQL 連接器 #
Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode
JDBC 連接器允許使用 JDBC 驅動向任意類型的關系型數據庫讀取或者寫入數據。本文檔描述了針對關系型數據庫如何通過建立 JDBC 連接器來執行 SQL 查詢。
如果在 DDL 中定義了主鍵,JDBC sink 將以 upsert 模式與外部系統交換 UPDATE/DELETE 消息;否則,它將以 append 模式與外部系統交換消息且不支持消費 UPDATE/DELETE 消息。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/jdbc/
Flink SQL CDC 上線!我們總結了 13 條生產實踐經驗
https://my.oschina.net/u/2828172/blog/4545836
FLIP-84:改進和重構TableEnvironment和Table的API
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
如果執行mysql中寫入數據不成功的話 ,查看demo沒有問題后,可以把相關的sql拿到mysql中進行執行查看