原文鏈接:Spark+Kafka的Direct方式將偏移量發送到Zookeeper實現
Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka集群中讀取數據,並且在Spark Streaming系統里面維護偏移量相關的信息,並且通過這種方式去實現零數據丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spark Streaming系統自己維護Kafka的讀偏移量,而Spark Streaming系統並沒有將這個消費的偏移量發送到Zookeeper中,這將導致那些基於偏移量的Kafka集群監控軟件(比如:Apache Kafka監控之Kafka Web Console、Apache Kafka監控之KafkaOffsetMonitor等)失效。本文就是基於為了解決這個問題,使得我們編寫的Spark Streaming程序能夠在每次接收到數據之后自動地更新Zookeeper中Kafka的偏移量。
我們從Spark的官方文檔可以知道,維護Spark內部維護Kafka便宜了信息是存儲在HasOffsetRanges
類的offsetRanges
中,我們可以在Spark Streaming程序里面獲取這些信息:
1 |
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges |
這樣我們就可以獲取所以分區消費信息,只需要遍歷offsetsList,然后將這些信息發送到Zookeeper即可更新Kafka消費的偏移量。完整的代碼片段如下:
01 |
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) |
02 |
messages.foreachRDD(rdd = > { |
03 |
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges |
04 |
val kc = new KafkaCluster(kafkaParams) |
05 |
for (offsets < - offsetsList) { |
06 |
val topicAndPartition = TopicAndPartition( "iteblog" , offsets.partition) |
07 |
val o = kc.setConsumerOffsets(args( 0 ), Map((topicAndPartition, offsets.untilOffset))) |
08 |
if (o.isLeft) { |
09 |
println(s "Error updating the offset to Kafka cluster: ${o.left.get}" ) |
10 |
} |
11 |
} |
12 |
}) |
KafkaCluster
類用於建立和Kafka集群的鏈接相關的操作工具類,我們可以對Kafka中Topic的每個分區設置其相應的偏移量Map((topicAndPartition, offsets.untilOffset))
,然后調用KafkaCluster
類的setConsumerOffsets
方法去更新Zookeeper里面的信息,這樣我們就可以更新Kafka的偏移量,最后我們就可以通過KafkaOffsetMonitor之類軟件去監控Kafka中相應Topic的消費信息,下圖是KafkaOffsetMonitor的監控情況:
從圖中我們可以看到KafkaOffsetMonitor監控軟件已經可以監控到Kafka相關分區的消費情況,這對監控我們整個Spark Streaming程序來非常重要,因為我們可以任意時刻了解Spark讀取速度。另外,KafkaCluster工具類的完整代碼如下:
01 |
package org.apache.spark.streaming.kafka |
02 |
03 |
import kafka.api.OffsetCommitRequest |
04 |
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} |
05 |
import kafka.consumer.SimpleConsumer |
06 |
import org.apache.spark.SparkException |
07 |
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig |
08 |
09 |
import scala.collection.mutable.ArrayBuffer |
10 |
import scala.util.Random |
11 |
import scala.util.control.NonFatal |
12 |
13 |
/** |
14 |
* User: 過往記憶 |
15 |
* Date: 2015-06-02 |
16 |
* Time: 下午23:46 |
17 |
* bolg: http://www.iteblog.com |
18 |
* 本文地址:http://www.iteblog.com/archives/1381 |
19 |
* 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨 |
20 |
* 過往記憶博客微信公共帳號:iteblog_hadoop |
21 |
*/ |
22 |
23 |
class KafkaCluster( val kafkaParams : Map[String, String]) extends Serializable { |
24 |
type Err = ArrayBuffer[Throwable] |
25 |
26 |
@ transient private var _ config : SimpleConsumerConfig = null |
27 |
28 |
def config : SimpleConsumerConfig = this .synchronized { |
29 |
if ( _ config == null ) { |
30 |
_ config = SimpleConsumerConfig(kafkaParams) |
31 |
} |
32 |
_ config |
33 |
} |
34 |
35 |
def setConsumerOffsets(groupId : String, |
36 |
offsets : Map[TopicAndPartition, Long] |
37 |
) : Either[Err, Map[TopicAndPartition, Short]] = { |
38 |
setConsumerOffsetMetadata(groupId, offsets.map { kv = > |
39 |
kv. _ 1 -> OffsetMetadataAndError(kv. _ 2 ) |
40 |
}) |
41 |
} |
42 |
43 |
def setConsumerOffsetMetadata(groupId : String, |
44 |
metadata : Map[TopicAndPartition, OffsetMetadataAndError] |
45 |
) : Either[Err, Map[TopicAndPartition, Short]] = { |
46 |
var result = Map[TopicAndPartition, Short]() |
47 |
val req = OffsetCommitRequest(groupId, metadata) |
48 |
val errs = new Err |
49 |
val topicAndPartitions = metadata.keySet |
50 |
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer = > |
51 |
val resp = consumer.commitOffsets(req) |
52 |
val respMap = resp.requestInfo |
53 |
val needed = topicAndPartitions.diff(result.keySet) |
54 |
needed.foreach { tp : TopicAndPartition = > |
55 |
respMap.get(tp).foreach { err : Short = > |
56 |
if (err == ErrorMapping.NoError) { |
57 |
result + = tp -> err |
58 |
} else { |
59 |
errs.append(ErrorMapping.exceptionFor(err)) |
60 |
} |
61 |
} |
62 |
} |
63 |
if (result.keys.size == topicAndPartitions.size) { |
64 |
return Right(result) |
65 |
} |
66 |
} |
67 |
val missing = topicAndPartitions.diff(result.keySet) |
68 |
errs.append( new SparkException(s "Couldn't set offsets for ${missing}" )) |
69 |
Left(errs) |
70 |
} |
71 |
72 |
private def withBrokers(brokers : Iterable[(String, Int)], errs : Err) |
73 |
(fn : SimpleConsumer = > Any) : Unit = { |
74 |
brokers.foreach { hp = > |
75 |
var consumer : SimpleConsumer = null |
76 |
try { |
77 |
consumer = connect(hp. _ 1 , hp. _ 2 ) |
78 |
fn(consumer) |
79 |
} catch { |
80 |
case NonFatal(e) = > |
81 |
errs.append(e) |
82 |
} finally { |
83 |
if (consumer ! = null ) { |
84 |
consumer.close() |
85 |
} |
86 |
} |
87 |
} |
88 |
} |
89 |
90 |
def connect(host : String, port : Int) : SimpleConsumer = |
91 |
new SimpleConsumer(host, port, config.socketTimeoutMs, |
92 |
config.socketReceiveBufferBytes, config.clientId) |
93 |
} |