Kafka報錯處理
1、 記一次kafka報錯處理
Kafka停止后,再啟動的時候發生了報錯:
[2017-10-27 09:43:18,313] INFO Recovering unflushed segment 15000679 in log mytest-0. (kafka.log.Log)
[2017-10-27 09:43:18,972] ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "derby" (kafka.log.LogManager)
[2017-10-27 09:43:18,975] FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NumberFormatException: For input string: "derby"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at kafka.log.Log$.offsetFromFilename(Log.scala:1648)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:284)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:272)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
直接看報錯日志,從日志中可以看出有個明顯的報錯:
ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "derby" (kafka.log.LogManager)
從原義上可以看出說是有個線程在加載log的時候出錯了,java.lang.NumberFormatException拋出的異常,輸入的字符串derby有問題。
什么鬼啊??
首先來分析一下kafka重新啟動要做的事情:
啟動kafka broker的時候,會重新load之前的每個topic的數據,正常情況下會提示每個topic恢復完成。
INFO Recovering unflushed segment 8790240 in log userlog-2. (kafka.log.Log)
INFO Loading producer state from snapshot file 00000000000008790240.snapshot for partition userlog-2 (kafka.log.ProducerStateManager)
INFO Loading producer state from offset 10464422 for partition userlog-2 with message format version 2 (kafka.log.Log)
INFO Loading producer state from snapshot file 00000000000010464422.snapshot for partition userlog-2 (kafka.log.ProducerStateManager)
INFO Completed load of log userlog-2 with 2 log segments, log start offset 6223445 and log end offset 10464422 in 4460 ms (kafka.log.Log)
但當有些topic下的數據恢復失敗的時候,會導致broker關閉,就會報錯:
ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "derby" (kafka.log.LogManager)
現在清楚了問題出在topic的數據有問題,什么問題呢??
趕緊到kafka存放topic的地方去看一下,這個路徑是在server.properties里面設置的:
log.dirs=/data/kafka/kafka-logs
1)從錯誤日志前一行來看:
可以看出,是在加載mytest-0這個topic出現的問題,直接到這個topic所在的目錄下,發現有個derby.log.是非法文件直接刪掉,重啟服務。
2)完整的在查一下,確保沒有類似的文件
#cd /data/kafka/kafka-logs
#find /data/kafka/kafka-logs/ -name "derby*"
可以看到在 topic,mytest-0下有個derby.log的文件,是非法的。因為kafka broker要求所有數據文件名稱都是Long類型的。只要把這個文件刪掉,在重啟kafka就可以了。
2、 記一次kafka、zookeeper報錯
Kafka和zookeeper都正常啟動,但是從日志看,連上以后很快就斷開連接 。報錯信息如下:
[2017-10-27 15:06:08,981] INFO Established session 0x15f5c88c014000a with negotiated timeout 240000 for client /127.0.0.1:33494 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-27 15:06:08,982] INFO Processed session termination for sessionid: 0x15f5c88c014000a (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-27 15:06:08,984] WARN caught end of stream exception (org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid 0x15f5c88c014000a, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:745)
從日志字面意思來看,第一條日志:說是session 0x15f5c88c014000a 240秒后超時了,(什么鬼?);繼續第二條日志說0x15f5c88c014000a 這個session結束了,超時導致斷開了這個session,這是明白的;Ok接下來看第三條:不能從0x15f5c88c014000a session讀取額外的數據了。(都斷開連接了,怎么讀)。至此日志分析完畢,看來就是session超時斷開導致的。直接就去加大session的連接時間就可以了。
配置的超時時間太短,Zookeeper沒有讀完Consumer的數據,連接就被Consumer斷開了!
解決方法:
修改kafka的server.properties文件:
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=600000
zookeeper.session.timeout.ms=400000
一般就可以了。如果還不放心就把zookeeper的配置文件也改一下:
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=1000
tickTime=120000
3、記一次kafka報錯
kafka.common.ConsumerRebalanceFailedException異常解決
consumer消費kafka消息的時候,出現一個報錯:
kafka.common.ConsumerRebalanceFailedException: migart_nginx-1446432618163-2746a209 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:92)
at com.symboltech.mine.ConsumerUtil.start(ConsumerUtil.java:40)
at com.symboltech.mine.KafkaConsumer.sdf(KafkaConsumer.java:58)
解決辦法:
這是kafka的consumer的zk配置項的問題,修改kafka的consumer.properties
zookeeper.session.timeout.ms=10000
zookeeper.connection.timeout.ms=10000
#消費均衡兩次重試之間的時間間隔
rebalance.backoff.ms=3000
#消費均衡的重試次數
rebalance.max.retries=10
注:
rebalance.backoff.ms*rebalance.max.retries > zookeeper.session.timeout.ms,否則還沒有處理完畢,session還沒有斷開,就有新的consumer進來。
官方解釋:
consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for "conflict in ").
If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.
Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won't realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.
4、kafka常用配置,注意參考官方的說明,有些參數可能有的版本已經廢棄,此處的參數僅供參考。
broker配置
#非負整數,用於唯一標識broker
broker.id 0
#kafka持久化數據存儲的路徑,可以指定多個,以逗號分隔
log.dirs /tmp/kafka-logs
#broker接收連接請求的端口
port 9092
#指定zk連接字符串,[hostname:port]以逗號分隔
zookeeper.connect
#單條消息最大大小控制,消費端的最大拉取大小需要略大於該值
message.max.bytes 1000000
#接收網絡請求的線程數
num.network.threads 3
#用於執行請求的I/O線程數
num.io.threads 8
#用於各種后台處理任務(如文件刪除)的線程數
background.threads 10
#待處理請求最大可緩沖的隊列大小
queued.max.requests 500
#配置該機器的IP地址
host.name
#默認分區個數
num.partitions 1
#分段文件大小,超過后會輪轉
log.segment.bytes 1024 * 1024 * 1024
#日志沒達到大小,如果達到這個時間也會輪轉
log.roll.{ms,hours} 168
#日志保留時間
log.retention.{ms,minutes,hours}
#不存在topic的時候是否自動創建
auto.create.topics.enable true
#partition默認的備份因子
default.replication.factor 1
#如果這個時間內follower沒有發起fetch請求,被認為dead,從ISR移除
replica.lag.time.max.ms 10000
#如果follower相比leader落后這么多以上消息條數,會被從ISR移除
replica.lag.max.messages 4000
#從leader可以拉取的消息最大大小
replica.fetch.max.bytes 1024 * 1024
#從leader拉取消息的fetch線程數
num.replica.fetchers 1
#zk會話超時時間
zookeeper.session.timeout.ms 6000
#zk連接所用時間
zookeeper.connection.timeout.ms
#zk follower落后leader的時間
zookeeper.sync.time.ms 2000
#是否開啟topic可以被刪除的方式
delete.topic.enable false
producer配置
#參與消息確認的broker數量控制,0代表不需要任何確認 1代表需要leader replica確認 -1代表需要ISR中所有進行確認
request.required.acks 0
#從發送請求到收到ACK確認等待的最長時間(超時時間)
request.timeout.ms 10000
#設置消息發送模式,默認是同步方式, async異步模式下允許消息累計到一定量或一段時間又另外線程批量發送,吞吐量好但丟失數據風險增大
producer.type sync
#消息序列化類實現方式,默認是byte[]數組形式
serializer.class kafka.serializer.DefaultEncoder
#kafka消息分區策略實現方式,默認是對key進行hash
partitioner.class kafka.producer.DefaultPartitioner
#對發送的消息采取的壓縮編碼方式,有none|gzip|snappy
compression.codec none
#指定哪些topic的message需要壓縮
compressed.topics null
#消息發送失敗的情況下,重試發送的次數 存在消息發送是成功的,只是由於網絡導致ACK沒收到的重試,會出現消息被重復發送的情況
message.send.max.retries 3
#在開始重新發起metadata更新操作需要等待的時間
retry.backoff.ms 100
#metadata刷新間隔時間,如果負值則失敗的時候才會刷新,如果0則每次發送后都刷新,正值則是一種周期行為
topic.metadata.refresh.interval.ms 600 * 1000
#異步發送模式下,緩存數據的最長時間,之后便會被發送到broker
queue.buffering.max.ms 5000
#producer端異步模式下最多緩存的消息條數
queue.buffering.max.messages 10000
#0代表隊列沒滿的時候直接入隊,滿了立即扔棄,-1代表無條件阻塞且不丟棄
queue.enqueue.timeout.ms -1
#一次批量發送需要達到的消息條數,當然如果queue.buffering.max.ms達到的時候也會被發送
batch.num.messages 200
consumer配置
#指明當前消費進程所屬的消費組,一個partition只能被同一個消費組的一個消費者消費
group.id
#針對一個partition的fetch request所能拉取的最大消息字節數,必須大於等於Kafka運行的最大消息
fetch.message.max.bytes 1024 * 1024
#是否自動周期性提交已經拉取到消費端的消息offset
auto.commit.enable true
#自動提交offset到zookeeper的時間間隔
auto.commit.interval.ms 60 * 1000
#消費均衡的重試次數
rebalance.max.retries 4
#消費均衡兩次重試之間的時間間隔
rebalance.backoff.ms 2000
#當重新去獲取partition的leader前需要等待的時間
refresh.leader.backoff.ms 200
#如果zookeeper上沒有offset合理的初始值情況下獲取第一條消息開始的策略smallest|largeset
auto.offset.reset largest
#如果其超時,將會可能觸發rebalance並認為已經死去
zookeeper.session.timeout.ms 6000
#確認zookeeper連接建立操作客戶端能等待的最長時間
zookeeper.connection.timeout.ms 6000
注:配置參數,摘自csdn,http://blog.csdn.net/huanggang028/article/details/47830529