java配置flinksql表連接kafka。
例如:
tableEnv.executeSql("CREATE TABLE invalidCtp (\n" +
" sys_name STRING,\n" +
" broker_id STRING,\n" +
" investor_id STRING\n," +
" row_rank BIGINT" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'invalidCtpDetail',\n" +
" 'properties.bootstrap.servers' = '47.104.234.54:9092',\n" +
// " 'connector.startup-mode' = 'latest-offset',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
// " 'kafka.auto.offset.reset' = 'latest',\n" +
" 'format' = 'json'\n" +
")");
本地可運行,服務器報錯:Flink 1.12 Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath
解決辦法:
pom.xml文件中加入依賴(也可去如下網站下載對應版本)
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
然后把該依賴包(flink-sql-connector-kafka_${scala.binary.version})放到flink安裝目錄的lib目錄下:
加入該依賴包后,可能會導致如下報錯:flink提交任務報錯:java.lang.ClassCastException LinkedMap cannot be cast to LinkedMap exceptions
解決辦法:
在conf/flink-conf.yaml添加如下內容並重啟flink:
classloader.resolve-order: parent-first
————————————————
版權聲明:本文為CSDN博主「wangandh」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/DH2442897094/article/details/120220852