關於flink 程序提交到yarn上面運行的相關操作和創建checkpoint和savepoint的相關操作


 最近公司在做實時的東西,我們是采用的是消費kafka的數據 然后將kafka的數據接入到MySQL 數據庫然后做一些簡單的統計計算,偽實時的展示數據(10分鍾進行一次)

(情景描述 我們公司用的是aws的服務器。所以flink 集群當中的服務有很多台)

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

問題(1)通過命令行去查看flink作業的id 使用命令 flink list 他就報下面的錯誤。

org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve standalone cluster
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
    at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:942)
    at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:413)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1013)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.util.ConfigurationException: cuoConfig parameter 'Key: 'jobmanager.rpc.address' , default: null (fallback keys: [])' is missing (hostname/address of JobManager to connect to).
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getJobManagerAddress(HighAvailabilityServicesUtils.java:150)
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:86)
    at org.apache.flink.client.program.ClusterClient.<init>(ClusterClient.java:118)
    at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:179)
    at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:152)
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
    ... 10 more

這個錯 感覺是沒有指定flink 的jobmanager  他沒有辦法知道他提交在哪台服務器上面。於是乎就就找這個jobmanager的地址在哪里。 這個怎么找那 我們通過命令行去找 ,因為程序是提交在yarn 上面

所以我們通過下面的命令

yarn application -list

 

 從上面的圖中 我們能看到提交的flink的jobmanager 然后采用使用下面的命令 然后展示你想要的flink 的id 

 

flink list -m 你的flink提交的jobmanager:32981  (這里需要需要機上-m )

 

 

 問題 2 上面的步驟是獲取到了 flink運行的id 接下來 我們要做的就是要對代碼進行升級,這里我們要做的是手工保存一份savapoint 但是貌似有很多坑(采坑開始)

flink savepoint 9a8d0b58d0fe2fd7b9c01081c8f1ca1d -m 自己的IP:32981 hdfs:///tmp/tmp 

這個是保存程序的savepoint

 

 

 這樣數據就保存到hdfs 上面去了。

除了這個方式之外 我們也可以通過在停止程序的時候將程序的savepoint 寫入到hdfs 當中去。

停止程序的時候的命令:

flink cancel -s hdfs:///tmp/tmp 088d62e015adcb92e1b92171c438bc18 -m 自己的IP:36305

這個是在程序停止的時候就將savepoint 保存到hdfs的目錄當中去了。

 

我們將程序停止掉之后,在程序再次啟動的時候 我們要將我們保存的savepoint通知程序 讓程序知道。啟動的命令如下:

flink run -m yarn-cluster -s hdfs:///tmp/tmp/savepoint-088d62-0d3a2d53d0e9 -c WordCount /mnt/flinkjar/flinkTestConsumeKafka.jar

 

問題三:

我們的程序提交到集群當中,然后發現程序跑起來了,但是貌似在界面沒有數據顯示

 

 

然后在網上找到一片帖子 ,我擦30大洋才能看 真是卧槽。 最后沒辦法還是看了一眼。好在問題得到解決(為什么我的Flink任務正常運行,UI上卻不顯示接收和發送的數據條數呢?)

 

 

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)
  val ds = CommonUtils.getDataStream(env = env)
    .name("kafka-source")
    .filter(_.nonEmpty)
    .print()
  env.execute()
}

這個是他的原代碼。

 

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)
  val ds = CommonUtils.getDataStream(env = env)
    .name("kafka-source")
    .filter(_.nonEmpty)
    .startNewChain()
    .disableChaining()
    .setParallelism(2)
    .print()
  env.execute()
}

這是修改后的代碼。這個時候就需要打斷operator chain,具體有三種寫法,如下代碼所示 這里的操作就是將操作連打斷,然后就能看到數據了。因為在一個DAG當中。

 這三種寫法都可以達到打斷chain的目的,有什么區別呢?startNewChain和disableChaining沒有實質性的區別,他倆都會打斷chain,但是不會改變算子的並發度,setParallelism和前面的算子並發度,設置的不一致自然就打斷chain了


當然這是別人的寫法,在Java當中也有類似的寫法:

keyedStream.addSink(new MysqlSink()).disableChaining().setParallelism(1);

我們是將數據sink到MySQL 當中。所以我這里這么寫

 

 

 

 到這里我們的數據順利的寫入到數據庫 在界面上 也能看到數據。這樣問題就得到了解決。

 

問題 四 關於kafka的 問題。我們通過讀取kafka 里面的數據 然后將數據寫入到mysql 我只用用的私網地址 然后就各種獲取不到kafka 的元數據信息 導致一直讀取不到數據。

所以 大家在做kafka讀數據的時候,先ping 一下地址看通不通。真是蛋疼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM