Flink作為新一代的大數據處理引擎,不僅是業內公認的最好的流處理引擎,而且具備機器學習等多種強大計算功能,用戶只需根據業務邏輯開發一套代碼,無論是全量數據還是增量數據,亦或者實時處理,一套方案即可全部解決。K8S是業內最流行的容器編排工具,與docker容器技術結合,可以提供比Yarn與Mesos更強大的集群資源管理功能,成為容器雲的主要解決方案之一。如果能將兩者結合,無疑是雙劍合璧,對生產效能有着巨大的提升。本文將介紹目前為止,Flink On K8S的最前沿實現方案。
Flink集群架構
如下圖所示,Flink集群中一個 JobManger 和若干個TaskManager。由 Client 提交任務給 JobManager,JobManager再調度任務到各個 TaskManager 去執行,然后 TaskManager 將心跳和統計信息匯報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的JVM進程。
Client是提交Job的客戶端,可以是運行在任何機器上(與JobManager 環境連通即可),也可以運行在容器中。提交Job后,Client可以結束進程(Streaming的任務),也可以不結束並等待結果返回。
JobManager主要負責調度Job並協調Task做checkpoint。從Client處接收到 Job 和 JAR 包等資源后,會生成優化后的執行計划,並以Task粒度調度到各個TaskManager上去執行。
TaskManager在啟動的時候就設置好了槽位數(Slot),每個slot能啟動一個Task,Task為線程。從JobManager處接收需要部署的Task,部署啟動后,與自己的上游建立 Netty 連接,接收數據並處理。
可以看到Flink的任務調度是多線程模型,並且不同Job/Task混合在一個 TaskManager 進程中。
目前在K8S中執行Flink任務的方式有兩種,一種是Standalone,一種是原生模式。
Standalone模式
在K8S中啟動Flink集群
Flink on Kubernetes 的架構如圖所示,Flink 任務在 Kubernetes 上運行的步驟有:
- 首先往 Kubernetes 集群提交了資源描述文件后,會啟動 Master 和 Worker 的 container。
- Master Container 中會啟動 Flink Master Process,包含 Flink-Container ResourceManager、JobManager 和 Program Runner。
- Worker Container 會啟動 TaskManager,並向負責資源管理的 ResourceManager 進行注冊,注冊完成之后,由 JobManager 將具體的任務分給 Container,再由 Container 去執行。
- 需要說明的是,Master Container 與Worker Container是用一個鏡像啟動的,只是啟動參數不一樣,如下圖所示,兩個deployment文件的image都是flink:latest。
計算任務可以以Session模式與Per-Job模式運行提交:
- Session模式:先啟動一個Flink集群,然后向該集群提交任務,所有任務共用JobManager。任務提交速度快,適合頻繁提交運行的短時間任務。
- Per-Job模式:每提交一個任務,單獨啟動一個集群運行該任務,運行結束集群被刪除,資源也被釋放。任務啟動較慢,適合於長時間運行的大型任務。
Session 模式
在Session模式下,需要先啟動一個Flink集群,然后向該集群提交任務,主要步驟為:先將集群配置定義為ConfigMap、然后通過官方資源描述文件分別啟動JobManager與一定數量的TaskManager,最后在flink客戶端向這個啟動的Flink集群中提交任務。
定義ConfigMap
對於 JobManager 和 TaskManager 運行過程中需要的一些配置文件,如:flink-conf.yaml、hdfs-site.xml、core-site.xml,可以通過flink-configuration-configmap.yaml文件將它們定義為 ConfigMap 來實現配置的傳遞和讀取。如果使用默認配置,這一步則不需要。
kubectl create -f flink-configuration-configmap.yaml
啟動JobManager
JobManager 的執行過程分為兩步:
- 首先,JobManager 通過 Deployment 進行描述,保證 1 個副本的 Container 運行 JobManager,可以定義一個標簽,例如 flink-jobmanager。
kubectl create -f jobmanager-deployment.yaml
- 其次,還需要定義一個JobManager Service,通過 service name 和 port 暴露 JobManager 服務,通過標簽選擇對應的 pods。
kubectl create -f jobmanager-service.yaml
啟動TaskManager
TaskManager 也是通過 Deployment 來進行描述,保證 n 個副本的 Container 運行 TaskManager,同時也需要定義一個標簽,例如 flink-taskmanager。
kubectl create -f taskmanager-deployment.yaml
提交任務
提交服務是通過請求JobManager Service實現的,如果從K8S集群外部請求該Service,需要對外暴露端口
kubectl port-forward service/flink-jobmanager 8081:8081
然后通過flink命令的m參數,指定服務的地址,即可向剛創建的集群中提交任務了。
./bin/flink run -d -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
刪除集群
直接利用K8S的命令行工具或者API刪除前面創建的資源對象即可
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f flink-configuration-configmap.yaml
Flink on Kubernetes–交互原理
整個交互的流程比較簡單,用戶往 Kubernetes 集群提交定義好的資源描述文件即可,例如 deployment、configmap、service 等描述。二手游戲賣號地圖后續的事情就交給 Kubernetes 集群自動完成。Kubernetes 集群會按照定義好的描述來啟動 pod,運行用戶程序。各個組件的具體工作如下:
- Service: 通過標簽(label selector)找到 job manager 的 pod 暴露服務。
- Deployment:保證 n 個副本的 container 運行 JM/TM,應用升級策略。
- ConfigMap:在每個 pod 上通過掛載 /etc/flink 目錄,包含 flink-conf.yaml 內容。
Per-Job模式
在官方的Per Job模式下,需要先將用戶代碼都打到鏡像里面,然后根據該鏡像來部署一個flink集群運行用戶代碼,即Flink job cluster。所以主要分為兩步:創建鏡像與部署Flink job cluster。
創建鏡像
在flink/flink-container/docker目錄下有一個build.sh腳本,可以根據指定版本的基礎鏡像去構建你的job鏡像,成功后會輸出 “Successfully tagged topspeed:latest” 的提示。
sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8 --scala-version 2.11 --job-jar ~/flink/flink-1.7.1/examples/streaming/TopSpeedWindowing.jar --image-name topspeed
鏡像構建完成后,可以上傳到 hub.docker.com 上,也可以上傳到你們項目組的內部Registry。
docker tag topspeed zkb555/topspeedwindowing
docker push zkb555/topspeedwindowing
部署Flink job cluster
在鏡像上傳之后,可以根據該鏡像部署Flink job cluster。
# 啟動Servive
kubectl create -f job-cluster-service.yaml
# 啟動JobManager
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-cluster-job.yaml.template | kubectl create -f –
# 啟動TaskManager
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB_PARALLELISM=4 envsubst < task-manager-deployment.yaml.template | kubectl create -f -
參數說明:
FLINK_DOCKER_IMAGE_NAME
- 鏡像名稱(默認:flink-job:latest
)FLINK_JOB
- 要執行的Flink任務名稱(默認:none)DEFAULT_PARALLELISM
- Flink任務的默認並行度 (默認: 1)FLINK_JOB_ARGUMENTS
- 其他任務參數;SAVEPOINT_OPTIONS
- Savepoint選項 (default: none)
這種方式比較笨重,如果業務邏輯的變動涉及代碼的修改,都需要重新生成鏡像,非常麻煩,在生產環境提交一個新任務重新打鏡像是不切實際的。一種更好的替代方案是將你的業務代碼放到NFS或者HDFS上,然后在啟動容器時通過掛載或者將jar包下載到容器內的方式執行你的Flink代碼,代碼位置通過啟動參數傳入。
需要注意的是Standalone模式需要在任務啟動時就確定TaskManager的數量,暫且不能像Yarn一樣,可以在任務啟動時申請動態資源。然而很多時候任務需要多少個TaskManager事先並不知道,TaskManager設置少了,任務可能跑不起來,多了又會造成資源浪費,需要在任務啟動時才能確定需要多少個TaskMananger,為了支持任務啟動時實時動態申請資源的功能,就有了下面介紹的原生模式, 這意味着Flink任務可以直接向K8s集群申請資源。
原生模式
原生模式提供了與K8S更好的集成,在Flink 1.9以上版本內置了K8S的客戶端,Flink的可以直接向K8S申請計算資源,集群資源得到了更高效的利用。這點與同Flink on Yarn/Mesos一樣。
做好以下准備工作就可以從你的flink客戶端直接提交flink任務到K8S集群。
- KubeConfig, 位於
~/.kube/config
,需要具備查看、創建與刪除pod與service對象的權限,可以在K8S客戶端通過kubectl auth can-i <list|create|edit|delete> pods
來驗證; - Kubernetes開啟DNS服務;
- 一個Kubernetes賬戶,需要具備創建與刪除pod的權限。
原生模式同樣支持Session模式玉Per-job兩種方式提交任務。
原生Session模式
與Standalone模式中的Session模式類似,還是分為兩步,先啟動一個集群,然后向集群提交任務。可以通過運行kubernetes-session.sh文件來啟動一個集群
./bin/kubernetes-session.sh
或者通過一些超參數來對集群進行設置
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000
然后在flink客戶端,通過flink命令提交任務
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
原生Session cluster的創建流程為:
- Flink客戶端先通過K8S的ApiServer提交cluster的描述信息,包括ConfigMap spec, Job Manager Service spec, Job Manager Deployment spec等;
- K8S接收到這些信息后就會拉取鏡像、掛載卷軸來啟動Flink master,這時候Dispatcher 與KubernetesResourceManager也會被啟動,從而可以接受Flink job;
- 當用戶通過Flink客戶端提交一個job時,客戶端就會生成這個job的job graph,並與這個job的jar包一起提交到Dispatcher,然后就會生成這個job的JobMaster;
- 最后JobMaster會向KubernetesResourceManager申請slot來執行這個job graph,如果集群中slot數量不夠,KubernetesResourceManager會啟動新的TaskManager pod並將它注冊到集群中。
原生Per-Job模式
目前尚處於實驗階段,在Flink 1.11版本中才支持。
官方的使用方式也是與前面Standalone-Per-Cluster模式類似,先創建一個包含用戶jar的用於啟動Flink Master的docker image,然后在客戶端通過flink命令根據該image提交任務,從而創建一個運行該任務的獨立集群。
./bin/flink run -d -e kubernetes-per-job
-Dkubernetes.cluster-id=<ClusterId>
-Dtaskmanager.container.image=<your image>
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
Per-Job模式的運行過程與Session模式的不同點在於Flink Master的啟動,其他步驟都一樣。Flink Master Deployment里面已經有Flink任務的jar包,在啟動Flink Master時Cluster Entrypoint就會運行該jar包的main函數產生job graph,並將該job graph與jar包提交給Dispatcher。
當然這種方式的缺點與Standalone-Per-Cluster一樣,每個用戶jar都需要一個單獨的鏡像,實際還是建議將用戶jar放在外部,在運行時掛載或者下載到容器中。
總結
本文介紹了Flink on K8S的各種方案,鑒於不需要事先指定taskmanager數量,原生模式相對於Standalone模式更有優勢,但目前尚處於實驗階段。他們兩者都支持Session模式與Per-Job模式,至於選擇哪種,看你實際的任務類型。如果式以頻繁提交的短期任務,如批處理為主,則適合Session模式,如果以長期運行的流式任務為主,則適合用Per-Job模式。