前言
架構轉型,擁抱雲原生服務生態
當前微信內部的大數據計算平台是基於自研的 Yard 資源調度系統來建設,Yard 的設計初衷除了提供在線服務資源隔離外,另一方面是為了提高在線服務機器的整體資源利用率,其核心策略是在機器空閑時能在上面跑一些大數據離線任務。但是對接業界各種大數據計算框架(例如 Hadoop MapReduce、Spark、Flink 等)都需要專門定制化開發,迭代維護非常不靈活,難以跟上開源社區發展的步伐。為此,我們開始轉向使用Kubernetes,並基於騰訊雲的 TKE 容器平台逐步搭建我們的大數據計算平台。
考慮到我們 Yard 平台上 Flink 作業還不是特別多,歷史包袱相對較少,所以我們首先開始 Flink on Kubernetes實戰之路。
微信 Flink 實時計算平台整體概況
微信 Flink 作業數據流轉圖
下圖是我們大多數業務的 Flink 作業實時計算數據流轉圖,數據經采集上報到消息隊列 Pulsar,用戶的 Flink 作業消費 Pulsar 計算(必要時也會訪問其他外部存儲,如Redis、FeatureKV等),計算結果可以落地到多種存儲系統,例如對於報表類業務,計算結果寫入 mysql/pg;對於實時樣本特征拼接作業,計算結果寫入 hdfs,為下游模型訓練不斷提供樣本;對於一些中間結果,則寫入Pulsar,以便對接下游 Flink 作業。

下面詳細闡述上圖中 Flink 作業是如何提交部署的。
集群及 Flink 作業部署
Flink on TKE 半托管服務,極致的Flink雲原生使用體驗
Flink on TKE 半托管服務提供了Flink集群部署、日志、監控、存儲等一站式的服務,用戶可以將其他在線業務與Flink運行在同一個集群中,從而最大程度提高資源資源使用率,達到統一資源、統一技術棧、統一運維等能力。
我們基於騰訊雲的 TKE 容器平台構建 Flink Kubernetes 計算集群。根據已有的 Flink 作業運營行情況,我們發現絕大多數 Flink 作業主要是耗費內存,而CPU利用率普遍較低,在機型選擇上我們推薦選擇內存型機器。
對於 Flink 作業的提交部署,Flink on Kubernetes 有多種部署模式(詳細介紹請參考TKE團隊出品的文章:Flink on kubernetes 部署模式分析),Flink 開源社區先后推出了基於 Standalone 的 Kubernetes 聲明式部署以及 Kubernetes Native 部署方式,基於 Standalone 的 Kubernetes 聲明式部署步驟繁瑣且不易管理,所以不考慮,另外社區的 Flink on Kubernetes Native 部署方式是從1.12起正式推出,功能還不夠完善,並且尚未被大規模生產驗證,我們在這之前其實已經開始調研部署,經過一番比較后,我們使用的是TKE容器團隊提供的Flink on TKE半托管服務(基於Kubernetes Operator),其提交部署流程大致如下圖所示。

通過 Flink Operator,客戶端就可以通過一個簡單的聲明式 API 提交部署 Flink 作業,各組件的生命周期統一由 Operator 控制,例如:
apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
name: flink-hello-world
spec:
image:
name: flink:1.11.3
jobManager:
resources:
limits:
memory: "1024Mi"
cpu: "200m"
taskManager:
replicas: 2
resources:
limits:
memory: "2024Mi"
cpu: "200m"
job:
jarFile: /opt/flink/examples/streaming/helloword.jar
className: org.apache.flink.streaming.examples.wordcount.WordCount
args: ["--input", "/opt/flink/README.txt"]
parallelism: 2
flinkProperties:
taskmanager.numberOfTaskSlots: "2"
Flink Operator 提交流程大致如下圖所示,首先會啟動一個 Flink Standalone Session Cluster,然后拉起一個 Job Pod 運行用戶代碼,向 Standalone Session Cluster 提交 Job,提交完成后會不斷去跟蹤 Job 的運行狀態。所以運行過程中會有三類 Pod,即 JobManager、TaskManager、Job Pod。

來源: https://github.com/lyft/flinkKubernetesoperator
使用 Flink Operator 部署 Flink 作業的好處不言而喻,客戶端不需要像 Flink on Kubernetes Native 部署方式那樣需要 kubeconfig,可以直接通過 http 接口訪問 API Server。雖然 Flink on Kubernetes Native 部署可以做到按需自動申請 TM,但是實際上我們的應用場景基本都是單 Job 的流計算,用戶事先規划好資源也可接受,而且基於 Flink Operator,我們可以做批調度,即 Gang Schedule,可以避免資源有限的情況下作業之間互相等待資源 hold 住的情況(例如大作業先提交,部分 TaskManager 長時間處於資源等待狀態,小作業后提交,小作業申請不到資源也 hold 在那里傻等)。
自動下載用戶上傳資源
作業與 Flink 內核動態分離,提高靈活性
通過上述的聲明式 API 方式提交部署,我們可以看到用戶 jar 包需要事先打到 image 里,作為平台提供方,當然不可能讓每個用戶自己去打 docker image,有些用戶甚至都不知道怎么用 docker,所以我們應該對用戶屏蔽 docker image,用戶只需要上傳 jar 包等資源即可。Flink Operator 提供了 initContainer 選項,借助它我們可以實現自動下載用戶上傳資源,但是為了簡單,我們直接修改 docker entrypoint 啟動腳本,先下載用戶上傳的資源,再啟動 Flink 相關進程,用戶上傳的資源通過環境變量聲明。例如:
apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
name: flink-hello-world
spec:
image:
name: flink:1.11.3
envVars:
- name: FLINK_USER_JAR
value: hdfs://xxx/path/to/helloword.jar
- name: FLINK_USER_DEPENDENCIES
value: hdfs://xxx/path/to/config.json,hdfs://xxx/path/to/vocab.txt
...
用戶上傳的依賴可以是任意文件,跟 Flink on Yarn 的方式不同,我們不用通過 submit 來分發依賴,而是在容器 docker entrypoint 啟動腳本中直接下載到工作目錄,以便用戶可以在代碼里以相對路徑的方式(例如 ./config.json)訪問到,如果依賴文件是 jar,則需要將其附加到 classpath 中,為了不修改 flink 的腳本,我們將 jar 附加到環境變量 HADOOP_CLASSPATH上,最后 Flink 相關進程啟動的時候會被加到 Java 的 classpath 中。
對於用戶主類所在的 jar(即環境變量FLINK_USER_JAR),只需要在 Job Pod 的 Container 中下載,如果同樣下載到當前目錄,那么它也會被附加到classpath中,在提交的時候可能會出現如下類加載鏈接錯誤,這是因為 Java 啟動的時候加載了一遍,在執行用戶main函數的時候 Flink 又會去加載一遍,所以我們將主 jar 包下載到一個專門固定目錄,例如/opt/workspace/main/,那么提交時通過spec.job.jarFile
參數指定到 /opt/workspace/main/xxx.jar 即可。
java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/pulsar/client/api/Authentication"
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_152]
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_152]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_152]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[?:1.8.0_152]
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_152]
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_152]
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_152]
總的來說,每類 pod 的啟動流程如下圖所示:

與微信后台服務打通
雲原生架構下的資源類型 Demonsets,簡化架構轉型復雜度
用戶的 Flink 作業經常需要在運行過程中與微信的后台服務進行交互,在傳統的裸機上訪問微信的后台服務需要機器部署 Agent 及路由配置,對於 Kubernetes 集群,在我們基礎架構中心的同事支持下,微信后台基礎 Agent 以 DeamonSet 方式打包到部署到每個節點上,我們在起 Flink 相關 Container 的時候,帶上 HostIPC 選項並掛載路由配置路徑,就可以像使用裸機一樣訪問微信的后台服務。
此外,因為部分 Agent 的 unix sock 文件在母機 /tmp 下,我們需要在容器里掛載目錄 /tmp,然而 Flink 運行過程中 shuffle、web 以及一些臨時文件(例如解壓出來的so等)默認都是放到 /tmp 目錄下,這就會導致作業即使失敗也會殘留一些垃圾到母機上,長此以往,/tmp 目錄勢必會被撐爆,所以我們在啟動 Java 進程時設置參數 -Djava.io.tmpdir=/opt/workspace/tmp,將 Java 的默認臨時目錄改到容器內的路徑,這樣作業失敗,容器銷毀不至於殘留垃圾。
屬性配置、日志及監控
日志與監控,提升可觀測性
從上面的聲明式 yaml 配置可以看到,提交 Flink 作業時是通過flinkProperties 選項來指定 Flink 屬性參數,事實上 Flink Operator 會將flinkProperties指定的屬性參數以 ConfigMap 形式部署,會覆蓋 image 中的 flink/conf 目錄,所以我們不能將系統默認屬性配置放到 flink image 中,為此,我們在客戶端維護一份 Flink 系統默認配置,在提交的時候會合並用戶填的屬性配置,填充到 flinkProperties 選項中,可以方便我們靈活調整 Flink 系統默認配置。
默認情況下,Flink on Kubernetes部署的作業,其在 Docker Container 中運行的進程都是前台運行的,使用 log4j-console.properties配置,日志會直接打到控制台,這樣就會導致 Flink UI 無法展示 log,只能去查看 Pod 日志,此外用戶通過 System.out.println 打的日志也會混在 log4j 的日志中,不易區分查看。所以我們重新定義了 log4j-console.properties,將 log4j 日志打到FLINK_LOG_DIR 目錄下的文件中,並按大小滾動,為了能在 Flink UI 上也能看到用戶 stdout 的輸出,在進程啟動命令flink-console.sh 最后加上 2>&1 | tee ${FLINK_LOG_PREFIX}.out,可以把控制台輸出的日志旁路一份到日志目錄的文件中。最后 Flink UI 展示的日志如下圖所示:

對於歷史失敗作業,我們在Kubernetes上也部署了一個 Flink History Server,可以靈活地擴縮容,從此再也不用擔心半夜作業掛了自動重啟無法追溯原因了。

對於資源及作業的監控,TKE 提供了免費的雲原生 Prometheus 服務 TPS,可以一鍵部署並關聯我們的 TKE 集群,然而我們在早期已經采用主流的 Prometheus + Grafana 組合部署了監控平台,這里就沒有使用TPS。當前我們有集群資源、應用組(Namespace)資源、作業資源利用情況的監控,大致如下圖所示。后面我們會再將每個作業 Flink Metric 推到 Prometheus,便於監控作業級別的反壓、gc、operator 流量等信息。

數據應用平台對接
基於上述基礎的 Flink-on-Kubernetes 能力,就可以將 Flink 對接到我們的各種數據應用平台上。如下圖所示,我們已經支持用戶使用多樣化的方式使用 Flink,用戶可以在機器學習平台拖拽節點或者注冊定制化節點以 Jar 包或 PyFlink 的方式使用,另外也可以在SQL分析平台上寫 Flink SQL。

對於 Jar、PyFlink 的方式使用就不詳細展開,對於 Flink SQL 的支持,我們目前是結合我們自身的元數據體系,利用 Flink 已有的 SQL 功能。當前實時數倉被業界廣泛提起,我們知道傳統的離線數倉,如 Hive,無外乎是在 HDFS 上套了一層 Schema,那么實時數倉也類似,數據源通常是 Kafka、Pulsar 這類消息隊列系統,在這之上套一層 Schema 將實時數據管理起來,就可以稱之為實時數倉了。我們基於SQL分析平台的元數據管理體系,構建 Flink SQL 能力,用戶可以在SQL分析平台上注冊/管理庫表元數據,為了架構簡單,我們並沒有去實現自己的 Flink Catalog(元數據操作直接在 SQL分析平台上完成,無需實現 create、drop 等 API),而是采用如下圖所示的流程來提交 SQL。

用戶在SQL分析平台上注冊庫表元數據(可以精細授權管控),然后編輯 SQL 提交,首先SQL分析平台會做語法校驗、權限及合法性校驗,沒問題后,將 SQL 涉及到的元數據加密打包,連同聲明式配置 Yaml 提交給統一調度平台,在統一調度平台上我們開發了一個 FlinkSQL 類型的作業,本質上就是一個常規的 Flink Jar 作業,即 FlinkSQLDriver ,用於接受 SQL 及其附屬的參數,FlinkSQLDriver 被提交后,解析傳過來的配置,組裝完整的 SQL 語句(包括 DDL、DML),然后調用 tableEnvironment.executeSql逐條執行,所以本質上是將庫表臨時注冊到 default catalog 中。
小結
本文從整體上介紹了微信 Flink-on-Kubernetes實戰經驗以及 Flink 數據應用平台的概況,一方面我們提供最基礎的 Flink 計算平台能力,借助Kubernetes有效管控集群,另一方面我們在已有的數據通道及元數據平台上構建實時數倉,提供 Flink SQL 能力,進一步降低用戶使用門檻,對於 Flink SQL 的支持目前還比較初級和原始,后面我們將結合業務使用情況探索更多深層次的優化。
