最近公司在做實時的東西,我們是采用的是消費kafka的數據 然后將kafka的數據接入到MySQL 數據庫然后做一些簡單的統計計算,偽實時的展示數據(10分鍾進行一次)
(情景描述 我們公司用的是aws的服務器。所以flink 集群當中的服務有很多台)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
問題(1)通過命令行去查看flink作業的id 使用命令 flink list 他就報下面的錯誤。
org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve standalone cluster at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51) at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:942) at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:413) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1013) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.util.ConfigurationException: cuoConfig parameter 'Key: 'jobmanager.rpc.address' , default: null (fallback keys: [])' is missing (hostname/address of JobManager to connect to). at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getJobManagerAddress(HighAvailabilityServicesUtils.java:150) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:86) at org.apache.flink.client.program.ClusterClient.<init>(ClusterClient.java:118) at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:179) at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:152) at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49) ... 10 more
這個錯 感覺是沒有指定flink 的jobmanager 他沒有辦法知道他提交在哪台服務器上面。於是乎就就找這個jobmanager的地址在哪里。 這個怎么找那 我們通過命令行去找 ,因為程序是提交在yarn 上面
所以我們通過下面的命令
yarn application -list
從上面的圖中 我們能看到提交的flink的jobmanager 然后采用使用下面的命令 然后展示你想要的flink 的id
flink list -m 你的flink提交的jobmanager:32981 (這里需要需要機上-m )
問題 2 上面的步驟是獲取到了 flink運行的id 接下來 我們要做的就是要對代碼進行升級,這里我們要做的是手工保存一份savapoint 但是貌似有很多坑(采坑開始)
flink savepoint 9a8d0b58d0fe2fd7b9c01081c8f1ca1d -m 自己的IP:32981 hdfs:///tmp/tmp
這個是保存程序的savepoint
這樣數據就保存到hdfs 上面去了。
除了這個方式之外 我們也可以通過在停止程序的時候將程序的savepoint 寫入到hdfs 當中去。
停止程序的時候的命令:
flink cancel -s hdfs:///tmp/tmp 088d62e015adcb92e1b92171c438bc18 -m 自己的IP:36305
這個是在程序停止的時候就將savepoint 保存到hdfs的目錄當中去了。
我們將程序停止掉之后,在程序再次啟動的時候 我們要將我們保存的savepoint通知程序 讓程序知道。啟動的命令如下:
flink run -m yarn-cluster -s hdfs:///tmp/tmp/savepoint-088d62-0d3a2d53d0e9 -c WordCount /mnt/flinkjar/flinkTestConsumeKafka.jar
問題三:
我們的程序提交到集群當中,然后發現程序跑起來了,但是貌似在界面沒有數據顯示
然后在網上找到一片帖子 ,我擦30大洋才能看 真是卧槽。 最后沒辦法還是看了一眼。好在問題得到解決(為什么我的Flink任務正常運行,UI上卻不顯示接收和發送的數據條數呢?)
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val ds = CommonUtils.getDataStream(env = env) .name("kafka-source") .filter(_.nonEmpty) .print() env.execute() }
這個是他的原代碼。
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val ds = CommonUtils.getDataStream(env = env) .name("kafka-source") .filter(_.nonEmpty) .startNewChain() .disableChaining() .setParallelism(2) .print() env.execute() }
這是修改后的代碼。這個時候就需要打斷operator chain,具體有三種寫法,如下代碼所示 這里的操作就是將操作連打斷,然后就能看到數據了。因為在一個DAG當中。
這三種寫法都可以達到打斷chain的目的,有什么區別呢?startNewChain和disableChaining沒有實質性的區別,他倆都會打斷chain,但是不會改變算子的並發度,setParallelism和前面的算子並發度,設置的不一致自然就打斷chain了
當然這是別人的寫法,在Java當中也有類似的寫法:
keyedStream.addSink(new MysqlSink()).disableChaining().setParallelism(1);
我們是將數據sink到MySQL 當中。所以我這里這么寫

到這里我們的數據順利的寫入到數據庫 在界面上 也能看到數據。這樣問題就得到了解決。
問題 四 關於kafka的 問題。我們通過讀取kafka 里面的數據 然后將數據寫入到mysql 我只用用的私網地址 然后就各種獲取不到kafka 的元數據信息 導致一直讀取不到數據。
所以 大家在做kafka讀數據的時候,先ping 一下地址看通不通。真是蛋疼