【Flink系列九】Flink 作業提交遇到的問題記錄以及原理


起因

  • 由於近期研究了ElasticSearch的Connector,但是目前生產環境不需要此jar。
  • Flink社區的一些小伙伴交流的時候,發現有人在使用Flink Session-Cluster模式提交作業,正好發現自己缺少這塊知識細節。
  • 慮到Yarn集群不可用,或者沒有Yarn的開發環境下,有Flink Session Cluster也是一個不錯的使用環境。

    帶着折騰的目的,故意舍近求遠。

需求

研究ES和CDC的使用,需要flink-sql-connector-cdc和flink-sql-connector-elasticsearch

問題記錄

org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath

※此問題網絡上有一些解決方案

例如 maven-shade-plugin

<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

僅看這個錯誤,是不夠的,無法確定是 Source的問題,還是Classpath的問題。還需要分析操作場景,才能找到真實原因。

SPI 類型

如果是在開發自定義Flink Source/Sink的時候遇到此問題,則問題可能是SPI加載機制。

  • 需要確保打的jar包 META-INF/services提供完整的信息。
    • 確保Classpath加載沒有問題的情況下
Classpath類型

如果是下載了新的Connector,例如官方提供的、Ververica提供的,則問題可能是classpath加載問題。

  • 如果是Yarn-Per-job模式提交作業,則需要確保所有涉及的Connector,包括FlinkSQL的connector,以及 TableAPI直接引用到的Connector,都以恰當的方式提交到Yarn集群。
    • 例如 flink run 的參數 -C, -yt
  • 如果是Session-Cluster模式提交作業,例如Standalone-Session模式,啟動集群的時候就要指定Connector的Classpath。
    • 正常情況下,都需要放在FLINK_DIST_DIR/lib內。
    • Session-Cluster模式需要重啟

實踐

在實踐過程中,此問題顯而易見,ververica文檔就說了,把jar放進lib內。

但是,舍近求遠,由於平台項目本身用到了額外的目錄存放這些Connector,並希望和lib區分開來。

問題就來了,如果放在其他的目錄怎么辦呢?

  • 這里是用的Standalone-Cluster方式,且jar依賴放在了Connectors內,和lib分開了。

原理

Flink的bin/config.sh,是bin內的腳本的配置讀取入口,內置了一些處理classpath的bash shell function。

  • 生效的方式為:. config.sh,這就是一個source命令。
  • bin/start-cluster.sh 以及 bin/flink 等腳本都依賴這個腳本中的一個function,也就是
constructFlinkClassPath() {
    local FLINK_DIST
    local FLINK_CLASSPATH
    ...
}

這個類其實就是找出了 flink-dist.jar,以及lib下的jar,生成一個classpath字符串。

※ 模仿FLINK_CLASSPATH, 將connectors目錄下的jar導入到classpath內。

local FLINK_CONNECTORS
while read -d '' -r jarfile ; do
        if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then
            :
        elif [[ "$FLINK_CONNECTORS" == "" ]]; then
            FLINK_CONNECTORS="$jarfile";
        else
            FLINK_CONNECTORS="$FLINK_CONNECTORS":"$jarfile"
        fi
    done < <(find "$FLINK_LIB_DIR/../connectors" ! -type d -name '*.jar' -print0 | sort -z)

echo "$FLINK_CLASSPATH"":$FLINK_CONNECTORS""$FLINK_DIST"

$FLINK_CONNECTORS插入到其中。

接下來,重啟cluster,再次提交flink作業即可解決此問題。同時,能解決所有這類問題。

額外的信息:

  • flink作業的jar依賴都用的 scope=provide
  • flink作業的jar的SPI沒有關於mysql-cdc的相關定義。

即便如此,也能正確提交作業。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM