關於30大洋看的一篇帖子(為什么我的Flink任務正常運行,UI上卻不顯示接收和發送的數據條數呢?)


最近發現有好幾個同學問我這個問題,為什么我的Flink任務正常運行,數據也可以打印,而且都保存到數據庫了,但是UI上面卻不顯示數據接收和發送的條數,我都快被問瘋了,今天就給大家詳細說一下這個小問題.

首先先來復現一下這個問題,我們先看下面的代碼(只是一部分代碼)

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)
  val ds = CommonUtils.getDataStream(env = env)
    .name("kafka-source")
    .filter(_.nonEmpty)
    .print()
  env.execute()
}

代碼非常的簡單,沒有任何的邏輯,從kafka讀取數據,只是做了一個filter,然后直接print,我這里就不寫sink,但是效果和直接sink是一樣的,把這個任務提交到集群運行,效果如下圖所示:

 

 

 

 我已經向kafka寫入數據了,這個地方怎么不顯示呢?然后我們來看下tm的stdout,因為我代碼里面直接print了,看下面的圖

 

 

 

很明顯數據打印了,說明我們程序是沒有問題的,那問題在哪呢?其實並不是你的程序有問題,也不是Flink的UI有bug,是因為默認情況下Flink開啟了operator chain,所以source filter print chain在了起就是在一個DAG里面,沒有向下游發送數據,所以顯示都為0,關於operator chain前面的文章已經說過了,還不了解的可以去查一下.那怎么能讓他顯示呢?
第一種方法,就是從metric里面看,可以自己添加兩個metric,如下圖所示

 

 

 我添加了兩個metric,一個filter的輸入和filter的輸出,可以看到都是100條數據.那如果想不添加metric,在ui上就能顯示呢?這個時候就需要打斷operator chain,具體有三種寫法,如下代碼所示

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)
  val ds = CommonUtils.getDataStream(env = env)
    .name("kafka-source")
    .filter(_.nonEmpty)
    .startNewChain()
    .disableChaining()
    .setParallelism(2)
    .print()
  env.execute()
}

這三種寫法都可以達到打斷chain的目的,有什么區別呢?startNewChain和disableChaining沒有實質性的區別,他倆都會打斷chain,但是不會改變算子的並發度,setParallelism和前面的算子並發度,設置的不一致自然就打斷chain了.我們就演示一下第一個

 

 

可以看到上面的DAG圖顯示了兩個,並且下面可以看到接收和發送的數據了,剩下的2種方法同樣可以達到這樣的效果,大家可以嘗試一下.

Flink的operator chain是有利於提高程序的性能的,建議使用,如果想要打斷operator chain又不想改變並發,就用前兩種方法,如果想要改變並發就用第三張方法

 

30大洋 我盡力了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM