轉發請注明原創地址:https://www.cnblogs.com/dongxiao-yang/p/10602799.html
某日晚高峰忽然集群某個大流量業務收到lag報警,查看客戶端日志發現reblance一直無法成功,日志如下
根據客戶端日志顯示consumer在嘗試joingroup的過程中收到了服務端COORDINATOR狀態不正常的信息,懷疑是服務端負責這個consumer-group的broker在coordinator元信息管理上出現了問題。
於是跑到對應的節點上看一下server日志,發現在一台剛才有過重啟的服務節點上產生如下日志
Failed to append 363 tombstones to __consumer_offsets-38 for expired/deleted offsets and/or metadata for group consumer-group. (kafka.coordinator.GroupMetadataManager)
org.apache.kafka.common.errors.NotLeaderForPartitionException: Leader not local for partition __consumer_offsets-38 on broker 。
懷疑是這個服務重啟的過程中__consumer_offset分區有部分數據或者文件有異常導致coordinator無法提供服務導致,停掉有問題節點后發現客戶端reblance很快就成功了,於是懷疑問題節點產生了壞文件,后續刪除對應分區可以重啟成功服務,但是對應group的業務又開始報錯
20 Mar 2019 15:31:32,000 INFO [PollableSourceRunner-KafkaSource-bl_app_event_detail_source] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle:542) - Offset commit for group consumer-group failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
20 Mar 2019 15:31:32,001 INFO [PollableSourceRunner-KafkaSource-bl_app_event_detail_source] (org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead:529) - Marking the coordinator 2147483543 dead.
kafka 自從0.9以來摒棄了consumer把offset存在zk的做法而是都存到了__consumer_offsets這個系統topic里面,同時consumer端的reblance都是依靠server端的coordinator負責調度協調。至於每個group怎么選擇對應broker節點是根據下面這個簡單的hashcode對__consumer_offsets分區數取模的算法得出來的,
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
所以看上去是重啟節點拉起來后客戶端發現對應的offset分區leader又活了,但是活過來的leader卻告知客戶端NOT_COORDINATOR_FOR_GROUP這個矛盾。但是明明有問題的offset文件已經被手動刪除掉了,重新拉副本也成功了,為什么還是會有join group不成功的現象呢。
繼續查看問題節點,發現問題節點在Loading group metadata for之類的日志的時候一直沒有輸出對應的問題group相關日志,初步判斷broker重啟過程中load group信息的時效出了問題。
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { // for each new leader or follower, call coordinator to handle consumer group migration. // this callback is invoked under the replica state change lock to ensure proper order of // leadership changes updatedLeaders.foreach { partition => if (partition.topic == GROUP_METADATA_TOPIC_NAME) groupCoordinator.handleGroupImmigration(partition.partitionId) else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch) }
如上述代碼所示,kafka在offset分區重新被選舉為leader的時候才會去加載對應的group信息,而且所有新leader是foreach單線程循環,如果其中有一個慢的剩下的group都會受到影響。查看問題節點果然除了被刪掉的offset分區還有一個分區offset歷史文件很多,多到500G的體量,這和offset這種只保存最新數據的場景明顯是不符合的,這個大小會導致服務端加載offset信息長到無法接受的程度。
為了盡快回復offset元信息,把問題節點的offset partition全都重新分配到其他節點,在重分配的過程中發現新的副本會不斷的刪除同步過來的過期數據最后結束后整個分區的大小只有幾十M,於是堅定了原來分區大小不正常的判斷 。對於__consumer_offsets這種compact策略的topic,kafka內部是有一個專門的logcleaner線程負責日志的合並,但是剛開始出問題的節點經過了幾次重啟,原始的現場早已不存在,於是把整個集群每個服務挨個查了一遍,果然在另一台看似正常的機器上同樣發現了一個很大的offset分區,jstack了一下,發現kafka-log-cleaner-thread這個線程已經沒了!重啟該服務后發現問題分區的日志也開始正常刪除。可惜的是由於服務日志只保留了最近7天的,kafka-log-cleaner-thread的錯誤日志已經找不到了,這個有待后續復現確認。
回顧了一下處理問題過程中出現的其他現象,其實都是有提示的,像是關掉問題節點的時候server日志會報
WARN Map failed (kafka.utils.CoreUtils$)
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:111)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:101)
以及kafka jvm第一次崩掉的hs_err_pid日志會提示內存不足
Native memory allocation (mmap) failed to map 65536 bytes for committing reserved memory
由於kafka使用的mmap方式映射了數據文件以及索引,這個mmap failed就已經提示了文件過多。
結論:kafka的offset數據每個group會根據hash取模的方式發到一個固定的_consumer_offsets分區中,_consumer_offsets分區的leader負責對應groupid的coordinator服務,_consumer_offsets
的刪除是由kafka-log-cleaner-thread執行的,這個線程個數默認是1,如果線程崩掉了offset歷史分區文件會一直無法刪除,導致jvm崩掉並且服務恢復的時候group元信息長時間的無法加載導致reblacne報錯。