Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath


java配置flinksql表連接kafka。

例如:

tableEnv.executeSql("CREATE TABLE invalidCtp (\n" +
" sys_name STRING,\n" +
" broker_id STRING,\n" +
" investor_id STRING\n," +
" row_rank BIGINT" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'invalidCtpDetail',\n" +
" 'properties.bootstrap.servers' = '47.104.234.54:9092',\n" +
// " 'connector.startup-mode' = 'latest-offset',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
// " 'kafka.auto.offset.reset' = 'latest',\n" +
" 'format' = 'json'\n" +
")");
本地可運行,服務器報錯:Flink 1.12 Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath

解決辦法:

pom.xml文件中加入依賴(也可去如下網站下載對應版本)

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
然后把該依賴包(flink-sql-connector-kafka_${scala.binary.version})放到flink安裝目錄的lib目錄下:

 

加入該依賴包后,可能會導致如下報錯:flink提交任務報錯:java.lang.ClassCastException LinkedMap cannot be cast to LinkedMap exceptions

解決辦法:

在conf/flink-conf.yaml添加如下內容並重啟flink:

classloader.resolve-order: parent-first
————————————————
版權聲明:本文為CSDN博主「wangandh」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/DH2442897094/article/details/120220852


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM