Flink整合oozie shell Action 提交任務 帶kerberos認證


最近這段時間一直在忙新集群遷移,上了最新的cdh6.3.0 於是Flink 提交遇到了許多的問題

還好有cloudera License 有了原廠的幫助和社區的伙伴,問題解決起來快了不少,手動滑稽

集群具體情況是,cdh6.3.0+Flink1.8.1,整個數據平台全部組件都上了kerberos和ldap因為要過認證,所以任務提交方法我們選擇統一oozie提交任務

並且因為kerberos認證,還需要Flink perjob 需要單獨的keytab,才能細膩度的控制權限,因為我們現在部門之間計算資源的划分是通過yarn資源隊列

但是現在Flink支持的不是很好,目前只能在配置文件中配置一個keytab,job啟動都去這個拉這個keytab復制到自己的contain里面

但是Flink第一提交方式還是希望能夠通過oozie提交job

由於oozie沒有天生支持Flink提交,所以只能選擇oozie shell action 的方式提交job

在Flink搭建好以后開始提交任務,用oozie shell提交

#!/bin/bash

flink run -m yarn-cluster flinktest.jar

馬上  Duang

flink command not find

改成命令絕對路徑以后! 還是 Duang

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:387)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:259) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

調度不了yarn ,這是因為oozie會覆蓋掉HADOOP_CONF_DIR

於是在shell里面手動export HADOOP_CONF_DIR = xxxxx

發現!!!

可以提交了

但是!!!

有時候能成功有時候失敗????黑人問號

org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@xxxxx:36166/user/resourcemanager

at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:202)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:539)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:164)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.ActorCell.invoke(ActorCell.scala:495)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start resource manager client.

at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:250)

at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:212)

at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:200)

... 16 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException: Application Master is already regist

resourcemanager注冊 Application Master的時候已經被注冊了?然后發生了一些異常

但是有時候又可以提交成功,這個就讓我有點困惑

最后發現是因為oozie覆蓋了很多集群上的環境變量導致

解決辦法 在oozie 腳本的flink命令前加env -i

這樣會清除所有的環境變量,oozie就會使用登陸yarn用戶的環境變量來運行shell了

終於

#!/bin/bash

env -i /flink run -m yarn-cluster flinktest.jar

shell action成功提交flink任務

但是kerberos現在還沒有解決,因為這樣提交job會去服務器上讀flink-conf.yaml文件里的kerberos認證,然后復制對應的keytab到所有容器,所有任務都是公用的一個

這樣的話不能實現每個job單獨使用一個keytab,每個job使用自己對應的kerberos認證

於是在社區群上取了下經,大家實現的方法也是千奇百怪

有全部任務公用一個認證的,有用CICD在容器每次提交的鏡像中在flink-conf.yaml中修改為指定的kerberos的

但是 我們不一樣~~

因為我們是oozie提交任務,有點頭大,還好最后還是解決了

因為Flink是通過去FLINK_CONF_DIR路徑下去讀取默認的flink-conf.yaml文件中的kerberos認證

那我們就需要在oozie shell 腳本中指定我們自己修改的flink-conf.yaml文件路徑通過手動指定FLINK_CONF_DIR去覆蓋Flink默認的

這個路徑我們填寫相對路徑,因為oozie運行時會將提交的文件復制到運行時的相對路徑下面

也就是說,我們可以oozie中把我們的keytab文件以及整個conf文件夾都上傳上去,修改conf/flink-conf.yaml文件中的kerberos選項

security.kerberos.login.keytab = . 

security.kerberos.login.principal = xxx

這里的keytab路徑就填寫相對路徑./因為oozie會把你上傳的keytab拷貝過去

最后運行oozie shell 腳本

#!/bin/bash

env -i FLINK_CONF_DIR=./conf   /flink run -m yarn-cluster  ./flinktest.jar

成功使用自己指定的keytab用戶運行job

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM