(1)安裝standlode集群模式啟動
啟動腳本是 bin/start-cluster.sh 不能用sh start-cluster
flink-1.8.1/bin/flink list
flink-1.8.1/bin/flink cancel 9b99be4eed871c4e62562f9035ebef65
(2)flink任務停不掉
執行取消命令后,查看一直在取消狀態
是由於有台機器掛掉了,沒有響應
提交任務的時候,報錯
flink scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutabl
項目用到了import scala.collection.JavaConverters._
所以需要引入scala的jar包
我本地用的scala2.11,我相信自己線上下的也是2.11,找了半天,在flink安裝目錄opt/jar中看到是2.12的scala,版本不一致導致的
(3)web監控頁面
Bytes received Records received Bytes sent Records sent
如果源和sink在同一個里面是不會有顯示值的
需要額外說明的是,這里的輸入/輸出只是在 flink 的各個節點之間,不包含與外界組件的交互信息。所以,這里的統計里, flink source 的 read-bytes/read-records 都是 0;flink sink 的 write-bytes/write-records 都是 0。在 flink UI 上的顯示也是如此。(yuchuanchen的博客有說明:https://blog.csdn.net/yuchuanchen/article/details/88406438)
通過在寫出前加個keyby就可以了
val source = env
.addSource(kafkaConsumerA)
.map(a =>{
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val day = simpleDateFormat.format(new Date())
parseToBean(a,day)
})
source.keyBy(a =>{
a.carid
}).map(a =>toClickHouseInsertFormat(a))
.addSink(new ClickhouseSink(props)).name("ck_sink")
也可以自己定義metric,然后提交上去就能看到指定位置的數據信息