問題導讀:
1.kafka的消費者組的消費偏移存儲,kafka支持兩個版本?
2.ConsumerOffsetChecker類的作用是什么?
3.Kafka如何通過源碼實現監控?
一,基本思路介紹
Kafka作為一個好用的且應用很廣泛的消息隊列,在大數據處理系統中基本是必不可少的。當然,作為緩存消息的消息隊列,我們對其進行流量監控及消費滯后告警就顯得異常重要了。
讀過前面的文章,<Kafka源碼系列之源碼解析SimpleConsumer的消費過程>和<Kafka源碼系列之Consumer高級API性能分析>這兩篇文章的兄弟姐妹應該看本篇文章會很簡單。實際就是利用SimpleConsumer獲取Partition最新的offset,用Zookeeper的工具獲取消費者組的各個分區的消費偏移,兩者做差就是lagSize。
但是實際kafka的消費者組的消費偏移存儲,kafka支持兩個版本的:
1,基於Zookeeper。OffsetFetchRequest.CurrentVersion為0。
2,基於kafka自身。OffsetFetchRequest.CurrentVersion為1(默認)。
那么要實現一個消費者消費滯后預警,就要兼容兩種方式,那么我們就詳細的來介紹這兩種方式的實現。
二,重要工具類
1,ConsumerOffsetChecker
Kafka提供的檢查消費者消費偏移,LogEndSize和lagsize的工具。我們實現自己的監控均可以模仿該類實現。本文也僅限於基於該類將實現過程。
2,ZkUtils
Kafka提供的操作Zookeeper的工具類。
3,SimpleConsumer
Kafka消費者實現類。Kafka的副本同步,低級消費者,高級消費者都是基於該類實現從kafka消費消息的。
4,OffsetRequest
消費者去獲取分區數據偏移的請求類,對應的請求key是:RequestKeys.OffsetsKey。在kafka的服務端kafkaApis的處理函數是:handleOffsetRequest(request)
5,OffsetFetchRequest
這個是請求某個topic的某個消費組的消費偏移,對應的請求key:RequestKeys.OffsetFetchKey。在kafka的服務端kafkaApis的處理函數是:handleOffsetFetchRequest(request)
6,OffsetManager
偏移管理器。內部維護了一個Scheduler,會定時執行compact,進行偏移的合並。
三,源代碼實現
1,首先是獲得消費者的消費偏移
ConsumerOffsetChecker當main方法中首先是獲得topic列表
[Bash shell]
純文本查看 復制代碼
val topicList = topics match {
case
Some(x) => x.
split
(
","
).view.toList
case
None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +
"/owners"
).toList
}
|
接着是建立到Broker鏈接,然后從kafka獲取消費者偏移
[Bash shell]
純文本查看 復制代碼
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
val topicPartitions = topicPidMap.flatMap {
case
(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
debug(
"Sending offset fetch request to coordinator %s:%d."
.
format
(channel.host, channel.port))
channel.send(OffsetFetchRequest(group, topicPartitions))
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
debug(
"Received offset fetch response %s."
.
format
(offsetFetchResponse))
offsetFetchResponse.requestInfo.foreach {
case
(topicAndPartition, offsetAndMetadata) =>
if
(offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
//
this group may not have migrated off zookeeper
for
offsets storage (we don't expose the dual-commit option
in
this tool
//
(meaning the lag may be off
until
all the consumers
in
the group have the same setting
for
offsets storage)
try {
val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir +
"/%d"
.
format
(topicAndPartition.partition))._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case
z: ZkNoNodeException =>
if
(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
offsetMap.put(topicAndPartition,-1)
else
throw z
}
}
else
if
(offsetAndMetadata.error == ErrorMapping.NoError)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
{
println(
"Could not fetch offset for %s due to %s."
.
format
(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
}
}
|
假如,獲得的偏移信息為空,那么就從Zookeeper獲取消費者偏移。
解決獲取topic的分區的最大偏移,實際思路是構建simpleConsumer,然后由其 去請求偏移,再跟獲取的消費者偏移做差就得到消費者最大偏移。
[Bash shell]
純文本查看 復制代碼
|
01
02
03
04
05
06
07
08
09
10
|
topicList.sorted.foreach {
topic => processTopic(zkClient, group, topic)
}
topicPidMap.get(topic) match {
case
Some(pids) =>
pids.sorted.foreach {
pid => processPartition(zkClient, group, topic, pid)
}
case
None =>
//
ignore
}
|
在processPartition中
[Bash shell]
純文本查看 復制代碼
|
01
02
03
04
05
06
07
08
09
10
11
12
|
val offsetOpt = offsetMap.get(topicPartition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir +
"/%s"
.
format
(pid))._1
ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
case
Some(bid) =>
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
consumerOpt match {
case
Some(consumer) =>
val topicAndPartition = TopicAndPartition(topic, pid)
val request =
OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.
head
|
然后做差得到LagSize
[Bash shell]
純文本查看 復制代碼
|
1
2
3
|
val lagString = offsetOpt.map(o =>
if
(o == -1)
"unknown"
else
(logSize - o).toString)
println(
"%-15s %-30s %-3s %-15s %-15s %-15s %s"
.
format
(group, topic, pid, offsetOpt.getOrElse(
"unknown"
), logSize, lagString.getOrElse(
"unknown"
),
owner match {
case
Some(ownerStr) => ownerStr
case
None =>
"none"
}))
|
getConsumer方法中
[Bash shell]
純文本查看 復制代碼
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
try {
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath +
"/"
+ bid)._1 match {
case
Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case
Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get(
"host"
).get.asInstanceOf[String]
val port = brokerInfo.get(
"port"
).get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000,
"ConsumerOffsetChecker"
))
case
None =>
throw new BrokerNotAvailableException(
"Broker id %d does not exist"
.
format
(bid))
}
case
None =>
throw new BrokerNotAvailableException(
"Broker id %d does not exist"
.
format
(bid))
}
} catch {
case
t: Throwable =>
println(
"Could not parse broker info due to "
+ t.getCause)
None
}
}
|
四,總結
該工具類的使用
[Bash shell]
純文本查看 復制代碼
|
1
|
bin
/kafka-consumer-offset-checker
.sh --group yourgroup -topic yourtopic --zookeeper localhost:2181
|
輸出結果
Offset是消費者消費到的偏移,logsize是kafka數據的最大偏移,Lag是二者的差。也即
LagSize = LogSize - Offset
得到我們消費組的滯后情況后,我們就可以根據需求(比如,設定滯后多少消息后給出告警),給出相應的告警。
轉自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=22215&extra=page%3D1&page=1&
