flume常見異常匯總以及解決方案
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
實際生產環境中,我用flume將kafka的數據定期的往hdfs集群中上傳數據,也遇到過一系列的坑,我在這里做個記錄,如果你也遇到同樣的錯誤,可以參考一下我的解決方案。
1>.服務器在接收到響應之前斷開連接。
報錯信息如下:
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

2018-11-13 06:17:30,378 (PollableSourceRunner-KafkaSource-kafkaSource) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483531 dead. Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. ... 6 more at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. at java.lang.Thread.run(Thread.java:748) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:561) org.apache.flume.ChannelException: Commit failed as send to Kafka failed 2018-11-13 06:17:29,376 (PollableSourceRunner-KafkaSource-kafkaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {} Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. at java.lang.Thread.run(Thread.java:748) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 2018-11-13 06:17:29,376 (PollableSourceRunner-KafkaSource-kafkaSource) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:560)] Sending events to Kafka failed 2018-11-13 06:17:04,257 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483529 dead. 2018-11-13 06:17:04,256 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:542)] Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry 2018-11-13 06:16:59,150 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483529 dead. 2018-11-13 06:16:59,149 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:542)] Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry
根據報錯分析,是由於時間過長導致重新均衡的,參考:https://kafka.apache.org/090/documentation/#configuration,經查閱資料應該調大以下幾個參數:
注意,這個*表示的是channels的名稱,這些參數不僅僅是可以給kafka channel設置,還可以給kafka source配置喲! #配置控制服務器等待追隨者確認以滿足生產者用acks配置指定的確認要求的最大時間量。如果超時后所請求的確認數不滿足,將返回一個錯誤。此超時在服務器端進行測量,不包括請求的網絡延遲。 agent.channels.*.kafka.consumer.timeout.ms = 70000 #配置控制客戶端等待請求響應的最大時間量。如果在超時之前沒有接收到響應,則客戶端將在必要時重新發送請求,或者如果重試用盡,則請求失敗。 agent.channels.*.kafka.consumer.request.timeout.ms = 80000 #如果沒有足夠的數據立即滿足fetch.min.bytes給出的要求,服務器在回答獲取請求之前將阻塞的最大時間。 agent.channels.*.kafka.consumer.fetch.max.wait.ms=7000 #在取消處理和恢復要提交的偏移數據之前,等待記錄刷新和分區偏移數據提交到偏移存儲的最大毫秒數。 agent.channels.*.kafka.consumer.offset.flush.interval.ms = 50000 #用於在使用kafka組管理設施時檢測故障的超時時間。 agent.channels.*.kafka.consumer.session.timeout.ms = 70000 #使用kafka組管理設施時,消費者協調器心跳的預期時間。心跳用於確保消費者的會話保持活躍,並在新消費者加入或離開組時促進重新平衡。該值必須設置為低於session.timeout.ms,但通常應設置為不高於該值的1/3。它可以調整得更低,以控制正常再平衡的預期時間。 agent.channels.*.kafka.consumer.heartbeat.interval.ms = 60000 #如果是true,消費者的偏移將在后台周期性地提交。如果auto.commit.enable=true,當consumer fetch了一些數據但還沒有完全處理掉的時候,剛好到commit interval出發了提交offset操作,接着consumer crash掉了。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。 agent.channels.*.kafka.consumer.enable.auto.commit = false
2>.producer在向kafka broker寫的時候,剛好發生選舉,本來是向broker0上寫的,選舉之后broker1成為leader,所以無法寫成功,就拋異常了。
報錯信息如下:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
以上報錯是我在重啟kafka集群中發現的報錯,百度了一下說是:producer在向kafka broker寫的時候,剛好發生選舉,本來是向broker0上寫的,選舉之后broker1成為leader,所以無法寫成功,就拋異常了。

018-11-15 16:41:13,916 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating hdfs://hdfs-ha/user/against_cheating/20181115/10-1-2-120_02_20181115_16.1542271273895.txt.tmp 2018-11-15 16:42:51,605 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:542)] Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry 2018-11-15 16:42:51,606 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483529 dead. 2018-11-15 16:43:58,386 (PollableSourceRunner-KafkaSource-kafkaSource) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:560)] Sending events to Kafka failed java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-11-15 16:43:58,387 (PollableSourceRunner-KafkaSource-kafkaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {} org.apache.flume.ChannelException: Commit failed as send to Kafka failed at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:561) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552) ... 6 more Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-11-15 16:44:13,867 (PollableSourceRunner-KafkaSource-kafkaSource) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:560)] Sending events to Kafka failed java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-11-15 16:44:13,868 (PollableSourceRunner-KafkaSource-kafkaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {} org.apache.flume.ChannelException: Commit failed as send to Kafka failed at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:561) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552) ... 6 more Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-11-15 16:44:13,944 (hdfs-hdfsSink-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing hdfs://hdfs-ha/user/against_cheating/20181115/10-1-2-120_02_20181115_16.1542271273895.txt.tmp
解決方案就是:
1>.先確認kafka集群是否在穩定運行,如果kafka集群異常的話,這個報錯會一致不斷的發出來;
2>.如果剛剛重啟集群的話,暫時先不高管它,flume會自動去重試,但是你也別閑着,查看kafka監控界面,觀察是否有異常的現象,如果時間超過了2分鍾還沒有恢復,那你就得考慮是否是你的kafka集群出現問題了。
3>.指定在 DataNode 內外傳輸數據使用的最大線程數偏小。
報錯信息如下:
java.io.IOException: Bad connect ack with firstBadLink as 10.1.1.120:50010
百度了一下原因:
Datanode往hdfs上寫時,實際上是通過使用xcievers這個中間服務往linux上的文件系統上寫文件的。其實這個xcievers就是一些負責在DataNode和本地磁盤上讀,寫文件的線程。DataNode上Block越多,這個線程的數量就應該越多。然后問題來了,這個線程數有個上線(默認是配置的4096)。所以,當Datenode上的Block數量過多時,就會有些Block文件找不到。線程來負責他的讀和寫工作了。所以就出現了上面的錯誤(寫塊失敗)。
解決方案:
將DataNode 內外傳輸數據使用的最大線程數增大,比如:65535。
4>.java.io.EOFException: Premature EOF: no length prefix available
根據上圖到提示,我們可以依稀看到DN節點,於是我們去CDH(如果你用到時HDP就去相應到平台即可)找相應到日志,發現的確有報錯信息如下:
我遇到了上述的問題后我做了3給操作,最終問題得以解決:
第一步:調優hdfs集群,詳細參數請參考我的筆記:https://www.cnblogs.com/yinzhengjie/p/10006880.html。
第二步:編輯了以下2個配置文件。

[root@calculation101 ~]# cat /etc/security/limits.d/20-nproc.conf # Default limit for number of user's processes to prevent # accidental fork bombs. # See rhbz #432903 for reasoning. * soft nproc 40960 root soft nproc unlimited [root@calculation101 ~]#

[root@calculation101 ~]# cat /etc/security/limits.conf | grep -v ^# | grep -v ^$ * soft nofile 1000000 * hard nofile 1048576 * soft nproc 65536 * hard nproc unlimited * soft memlock unlimited * hard memlock unlimited [root@calculation101 ~]#
第三步:重啟操作系統,重啟前確保所有的服務關閉,重啟成功后,確保所有的hdfs集群啟動成功,200G的數據只需要3分鍾左右就跑完了,2天過去了,上述的報錯依舊沒有復現過,如果大家遇到跟我相同的問題,也可以試試我的這個方法。
5>.