flume常見異常匯總以及解決方案


                       flume常見異常匯總以及解決方案

                                           作者:尹正傑

版權聲明:原創作品,謝絕轉載!否則將追究法律責任。

 

 

  實際生產環境中,我用flume將kafka的數據定期的往hdfs集群中上傳數據,也遇到過一系列的坑,我在這里做個記錄,如果你也遇到同樣的錯誤,可以參考一下我的解決方案。

 

1>.服務器在接收到響應之前斷開連接。

報錯信息如下:

  Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

2018-11-13 06:17:30,378 (PollableSourceRunner-KafkaSource-kafkaSource) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483531 dead.
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
        ... 6 more
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
        at java.lang.Thread.run(Thread.java:748)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:561)
org.apache.flume.ChannelException: Commit failed as send to Kafka failed
2018-11-13 06:17:29,376 (PollableSourceRunner-KafkaSource-kafkaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
        at java.lang.Thread.run(Thread.java:748)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
2018-11-13 06:17:29,376 (PollableSourceRunner-KafkaSource-kafkaSource) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:560)] Sending events to Kafka failed
2018-11-13 06:17:04,257 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483529 dead.
2018-11-13 06:17:04,256 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:542)] Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry
2018-11-13 06:16:59,150 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483529 dead.
2018-11-13 06:16:59,149 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:542)] Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry
Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry

   根據報錯分析,是由於時間過長導致重新均衡的,參考:https://kafka.apache.org/090/documentation/#configuration,經查閱資料應該調大以下幾個參數:

注意,這個*表示的是channels的名稱,這些參數不僅僅是可以給kafka channel設置,還可以給kafka source配置喲!

#配置控制服務器等待追隨者確認以滿足生產者用acks配置指定的確認要求的最大時間量。如果超時后所請求的確認數不滿足,將返回一個錯誤。此超時在服務器端進行測量,不包括請求的網絡延遲。
agent.channels.*.kafka.consumer.timeout.ms = 70000

#配置控制客戶端等待請求響應的最大時間量。如果在超時之前沒有接收到響應,則客戶端將在必要時重新發送請求,或者如果重試用盡,則請求失敗。
agent.channels.*.kafka.consumer.request.timeout.ms = 80000

#如果沒有足夠的數據立即滿足fetch.min.bytes給出的要求,服務器在回答獲取請求之前將阻塞的最大時間。
agent.channels.*.kafka.consumer.fetch.max.wait.ms=7000

#在取消處理和恢復要提交的偏移數據之前,等待記錄刷新和分區偏移數據提交到偏移存儲的最大毫秒數。
agent.channels.*.kafka.consumer.offset.flush.interval.ms = 50000

#用於在使用kafka組管理設施時檢測故障的超時時間。
agent.channels.*.kafka.consumer.session.timeout.ms = 70000

#使用kafka組管理設施時,消費者協調器心跳的預期時間。心跳用於確保消費者的會話保持活躍,並在新消費者加入或離開組時促進重新平衡。該值必須設置為低於session.timeout.ms,但通常應設置為不高於該值的1/3。它可以調整得更低,以控制正常再平衡的預期時間。
agent.channels.*.kafka.consumer.heartbeat.interval.ms = 60000

#如果是true,消費者的偏移將在后台周期性地提交。如果auto.commit.enable=true,當consumer fetch了一些數據但還沒有完全處理掉的時候,剛好到commit interval出發了提交offset操作,接着consumer crash掉了。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。
agent.channels.*.kafka.consumer.enable.auto.commit = false

 

2>.producer在向kafka broker寫的時候,剛好發生選舉,本來是向broker0上寫的,選舉之后broker1成為leader,所以無法寫成功,就拋異常了。

報錯信息如下:

  java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 

  以上報錯是我在重啟kafka集群中發現的報錯,百度了一下說是:producer在向kafka broker寫的時候,剛好發生選舉,本來是向broker0上寫的,選舉之后broker1成為leader,所以無法寫成功,就拋異常了。

018-11-15 16:41:13,916 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating hdfs://hdfs-ha/user/against_cheating/20181115/10-1-2-120_02_20181115_16.1542271273895.txt.tmp
2018-11-15 16:42:51,605 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:542)] Offset commit for group flume-consumer-against_cheating_02 failed due to REQUEST_TIMED_OUT, will find new coordinator and retry
2018-11-15 16:42:51,606 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:529)] Marking the coordinator 2147483529 dead.
2018-11-15 16:43:58,386 (PollableSourceRunner-KafkaSource-kafkaSource) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:560)] Sending events to Kafka failed
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2018-11-15 16:43:58,387 (PollableSourceRunner-KafkaSource-kafkaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Commit failed as send to Kafka failed
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:561)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
        ... 6 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2018-11-15 16:44:13,867 (PollableSourceRunner-KafkaSource-kafkaSource) [WARN - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:560)] Sending events to Kafka failed
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2018-11-15 16:44:13,868 (PollableSourceRunner-KafkaSource-kafkaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Commit failed as send to Kafka failed
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:561)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
        at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:295)
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
        ... 6 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2018-11-15 16:44:13,944 (hdfs-hdfsSink-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing hdfs://hdfs-ha/user/against_cheating/20181115/10-1-2-120_02_20181115_16.1542271273895.txt.tmp
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

  解決方案就是:

    1>.先確認kafka集群是否在穩定運行,如果kafka集群異常的話,這個報錯會一致不斷的發出來;

    2>.如果剛剛重啟集群的話,暫時先不高管它,flume會自動去重試,但是你也別閑着,查看kafka監控界面,觀察是否有異常的現象,如果時間超過了2分鍾還沒有恢復,那你就得考慮是否是你的kafka集群出現問題了。

 

3>.指定在 DataNode 內外傳輸數據使用的最大線程數偏小。

報錯信息如下:

  java.io.IOException: Bad connect ack with firstBadLink as 10.1.1.120:50010

  百度了一下原因:  

    Datanode往hdfs上寫時,實際上是通過使用xcievers這個中間服務往linux上的文件系統上寫文件的。其實這個xcievers就是一些負責在DataNode和本地磁盤上讀,寫文件的線程。DataNode上Block越多,這個線程的數量就應該越多。然后問題來了,這個線程數有個上線(默認是配置的4096)。所以,當Datenode上的Block數量過多時,就會有些Block文件找不到。線程來負責他的讀和寫工作了。所以就出現了上面的錯誤(寫塊失敗)。

  解決方案:

    將DataNode 內外傳輸數據使用的最大線程數增大,比如:65535。

 

 

4>.java.io.EOFException: Premature EOF: no length prefix available

 

  根據上圖到提示,我們可以依稀看到DN節點,於是我們去CDH(如果你用到時HDP就去相應到平台即可)找相應到日志,發現的確有報錯信息如下:

 我遇到了上述的問題后我做了3給操作,最終問題得以解決:

  第一步:調優hdfs集群,詳細參數請參考我的筆記:https://www.cnblogs.com/yinzhengjie/p/10006880.html。

  第二步:編輯了以下2個配置文件。

[root@calculation101 ~]# cat /etc/security/limits.d/20-nproc.conf 
# Default limit for number of user's processes to prevent
# accidental fork bombs.
# See rhbz #432903 for reasoning.

*          soft    nproc     40960
root       soft    nproc     unlimited
[root@calculation101 ~]# 
[root@calculation101 ~]# cat /etc/security/limits.d/20-nproc.conf
[root@calculation101 ~]# cat /etc/security/limits.conf  | grep -v ^#  | grep -v ^$
*        soft    nofile        1000000
*         hard    nofile        1048576
*        soft    nproc        65536
*        hard    nproc        unlimited
*        soft    memlock        unlimited
*        hard    memlock        unlimited
[root@calculation101 ~]# 
[root@calculation101 ~]# cat /etc/security/limits.conf | grep -v ^# | grep -v ^$

  第三步:重啟操作系統,重啟前確保所有的服務關閉,重啟成功后,確保所有的hdfs集群啟動成功,200G的數據只需要3分鍾左右就跑完了,2天過去了,上述的報錯依舊沒有復現過,如果大家遇到跟我相同的問題,也可以試試我的這個方法。

5>.

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM