問題導讀:
1.kafka的消費者組的消費偏移存儲,kafka支持兩個版本?
2.ConsumerOffsetChecker類的作用是什么?
3.Kafka如何通過源碼實現監控?
一,基本思路介紹
Kafka作為一個好用的且應用很廣泛的消息隊列,在大數據處理系統中基本是必不可少的。當然,作為緩存消息的消息隊列,我們對其進行流量監控及消費滯后告警就顯得異常重要了。
讀過前面的文章,<Kafka源碼系列之源碼解析SimpleConsumer的消費過程>和<Kafka源碼系列之Consumer高級API性能分析>這兩篇文章的兄弟姐妹應該看本篇文章會很簡單。實際就是利用SimpleConsumer獲取Partition最新的offset,用Zookeeper的工具獲取消費者組的各個分區的消費偏移,兩者做差就是lagSize。
但是實際kafka的消費者組的消費偏移存儲,kafka支持兩個版本的:
1,基於Zookeeper。OffsetFetchRequest.CurrentVersion為0。
2,基於kafka自身。OffsetFetchRequest.CurrentVersion為1(默認)。
那么要實現一個消費者消費滯后預警,就要兼容兩種方式,那么我們就詳細的來介紹這兩種方式的實現。
二,重要工具類
1,ConsumerOffsetChecker
Kafka提供的檢查消費者消費偏移,LogEndSize和lagsize的工具。我們實現自己的監控均可以模仿該類實現。本文也僅限於基於該類將實現過程。
2,ZkUtils
Kafka提供的操作Zookeeper的工具類。
3,SimpleConsumer
Kafka消費者實現類。Kafka的副本同步,低級消費者,高級消費者都是基於該類實現從kafka消費消息的。
4,OffsetRequest
消費者去獲取分區數據偏移的請求類,對應的請求key是:RequestKeys.OffsetsKey。在kafka的服務端kafkaApis的處理函數是:handleOffsetRequest(request)
5,OffsetFetchRequest
這個是請求某個topic的某個消費組的消費偏移,對應的請求key:RequestKeys.OffsetFetchKey。在kafka的服務端kafkaApis的處理函數是:handleOffsetFetchRequest(request)
6,OffsetManager
偏移管理器。內部維護了一個Scheduler,會定時執行compact,進行偏移的合並。
三,源代碼實現
1,首先是獲得消費者的消費偏移
ConsumerOffsetChecker當main方法中首先是獲得topic列表
[Bash shell]
純文本查看 復制代碼
val topicList = topics match {
case
Some(x) => x.
split
(
","
).view.toList
case
None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +
"/owners"
).toList
}
|
接着是建立到Broker鏈接,然后從kafka獲取消費者偏移
[Bash shell]
純文本查看 復制代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
val topicPartitions = topicPidMap.flatMap {
case
(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
debug(
"Sending offset fetch request to coordinator %s:%d."
.
format
(channel.host, channel.port))
channel.send(OffsetFetchRequest(group, topicPartitions))
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
debug(
"Received offset fetch response %s."
.
format
(offsetFetchResponse))
offsetFetchResponse.requestInfo.foreach {
case
(topicAndPartition, offsetAndMetadata) =>
if
(offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
//
this group may not have migrated off zookeeper
for
offsets storage (we don't expose the dual-commit option
in
this tool
//
(meaning the lag may be off
until
all the consumers
in
the group have the same setting
for
offsets storage)
try {
val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir +
"/%d"
.
format
(topicAndPartition.partition))._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case
z: ZkNoNodeException =>
if
(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
offsetMap.put(topicAndPartition,-1)
else
throw z
}
}
else
if
(offsetAndMetadata.error == ErrorMapping.NoError)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
{
println(
"Could not fetch offset for %s due to %s."
.
format
(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
}
}
|
假如,獲得的偏移信息為空,那么就從Zookeeper獲取消費者偏移。
解決獲取topic的分區的最大偏移,實際思路是構建simpleConsumer,然后由其 去請求偏移,再跟獲取的消費者偏移做差就得到消費者最大偏移。
[Bash shell]
純文本查看 復制代碼
01
02
03
04
05
06
07
08
09
10
|
topicList.sorted.foreach {
topic => processTopic(zkClient, group, topic)
}
topicPidMap.get(topic) match {
case
Some(pids) =>
pids.sorted.foreach {
pid => processPartition(zkClient, group, topic, pid)
}
case
None =>
//
ignore
}
|
在processPartition中
[Bash shell]
純文本查看 復制代碼
01
02
03
04
05
06
07
08
09
10
11
12
|
val offsetOpt = offsetMap.get(topicPartition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir +
"/%s"
.
format
(pid))._1
ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
case
Some(bid) =>
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
consumerOpt match {
case
Some(consumer) =>
val topicAndPartition = TopicAndPartition(topic, pid)
val request =
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.
head
|
然后做差得到LagSize
[Bash shell]
純文本查看 復制代碼
1
2
3
|
val lagString = offsetOpt.map(o =>
if
(o == -1)
"unknown"
else
(logSize - o).toString)
println(
"%-15s %-30s %-3s %-15s %-15s %-15s %s"
.
format
(group, topic, pid, offsetOpt.getOrElse(
"unknown"
), logSize, lagString.getOrElse(
"unknown"
),
owner match {
case
Some(ownerStr) => ownerStr
case
None =>
"none"
}))
|
getConsumer方法中
[Bash shell]
純文本查看 復制代碼
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
try {
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath +
"/"
+ bid)._1 match {
case
Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case
Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get(
"host"
).get.asInstanceOf[String]
val port = brokerInfo.get(
"port"
).get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000,
"ConsumerOffsetChecker"
))
case
None =>
throw new BrokerNotAvailableException(
"Broker id %d does not exist"
.
format
(bid))
}
case
None =>
throw new BrokerNotAvailableException(
"Broker id %d does not exist"
.
format
(bid))
}
} catch {
case
t: Throwable =>
println(
"Could not parse broker info due to "
+ t.getCause)
None
}
}
|
四,總結
該工具類的使用
[Bash shell]
純文本查看 復制代碼
1
|
bin
/kafka-consumer-offset-checker
.sh --group yourgroup -topic yourtopic --zookeeper localhost:2181
|
輸出結果
Offset是消費者消費到的偏移,logsize是kafka數據的最大偏移,Lag是二者的差。也即
LagSize = LogSize - Offset
得到我們消費組的滯后情況后,我們就可以根據需求(比如,設定滯后多少消息后給出告警),給出相應的告警。
轉自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=22215&extra=page%3D1&page=1&