起因
- 由於近期研究了ElasticSearch的Connector,但是目前生產環境不需要此jar。
- Flink社區的一些小伙伴交流的時候,發現有人在使用Flink Session-Cluster模式提交作業,正好發現自己缺少這塊知識細節。
- 慮到Yarn集群不可用,或者沒有Yarn的開發環境下,有Flink Session Cluster也是一個不錯的使用環境。
帶着折騰的目的,故意舍近求遠。
需求
研究ES和CDC的使用,需要flink-sql-connector-cdc和flink-sql-connector-elasticsearch
問題記錄
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath
※此問題網絡上有一些解決方案
例如 maven-shade-plugin
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
僅看這個錯誤,是不夠的,無法確定是 Source的問題,還是Classpath的問題。還需要分析操作場景,才能找到真實原因。
SPI 類型
如果是在開發自定義Flink Source/Sink的時候遇到此問題,則問題可能是SPI加載機制。
- 需要確保打的jar包 META-INF/services提供完整的信息。
- 確保Classpath加載沒有問題的情況下
Classpath類型
如果是下載了新的Connector,例如官方提供的、Ververica提供的,則問題可能是classpath加載問題。
- 如果是Yarn-Per-job模式提交作業,則需要確保所有涉及的Connector,包括FlinkSQL的connector,以及 TableAPI直接引用到的Connector,都以恰當的方式提交到Yarn集群。
- 例如 flink run 的參數
-C
,-yt
。
- 例如 flink run 的參數
- 如果是Session-Cluster模式提交作業,例如Standalone-Session模式,啟動集群的時候就要指定Connector的Classpath。
- 正常情況下,都需要放在
FLINK_DIST_DIR/lib
內。 - Session-Cluster模式需要重啟。
- 正常情況下,都需要放在
實踐
在實踐過程中,此問題顯而易見,ververica文檔就說了,把jar放進lib內。
但是,舍近求遠,由於平台項目本身用到了額外的目錄存放這些Connector,並希望和lib區分開來。
問題就來了,如果放在其他的目錄怎么辦呢?
- 這里是用的Standalone-Cluster方式,且jar依賴放在了Connectors內,和lib分開了。
原理
Flink的bin/config.sh
,是bin內的腳本的配置讀取入口,內置了一些處理classpath的bash shell function。
- 生效的方式為:
. config.sh
,這就是一個source命令。 bin/start-cluster.sh
以及bin/flink
等腳本都依賴這個腳本中的一個function,也就是
constructFlinkClassPath() {
local FLINK_DIST
local FLINK_CLASSPATH
...
}
這個類其實就是找出了 flink-dist.jar,以及lib下的jar,生成一個classpath字符串。
※ 模仿FLINK_CLASSPATH, 將connectors目錄下的jar導入到classpath內。
local FLINK_CONNECTORS
while read -d '' -r jarfile ; do
if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then
:
elif [[ "$FLINK_CONNECTORS" == "" ]]; then
FLINK_CONNECTORS="$jarfile";
else
FLINK_CONNECTORS="$FLINK_CONNECTORS":"$jarfile"
fi
done < <(find "$FLINK_LIB_DIR/../connectors" ! -type d -name '*.jar' -print0 | sort -z)
echo "$FLINK_CLASSPATH"":$FLINK_CONNECTORS""$FLINK_DIST"
將$FLINK_CONNECTORS
插入到其中。
接下來,重啟cluster,再次提交flink作業即可解決此問題。同時,能解決所有這類問題。
額外的信息:
- flink作業的jar依賴都用的
scope=provide
。 - flink作業的jar的SPI沒有關於
mysql-cdc
的相關定義。
即便如此,也能正確提交作業。