Spark+Kafka的Direct方式將偏移量發送到Zookeeper實現(轉)


原文鏈接:Spark+Kafka的Direct方式將偏移量發送到Zookeeper實現

 Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka集群中讀取數據,並且在Spark Streaming系統里面維護偏移量相關的信息,並且通過這種方式去實現零數據丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spark Streaming系統自己維護Kafka的讀偏移量,而Spark Streaming系統並沒有將這個消費的偏移量發送到Zookeeper中,這將導致那些基於偏移量的Kafka集群監控軟件(比如:Apache Kafka監控之Kafka Web ConsoleApache Kafka監控之KafkaOffsetMonitor等)失效。本文就是基於為了解決這個問題,使得我們編寫的Spark Streaming程序能夠在每次接收到數據之后自動地更新Zookeeper中Kafka的偏移量。

  我們從Spark的官方文檔可以知道,維護Spark內部維護Kafka便宜了信息是存儲在HasOffsetRanges類的offsetRanges中,我們可以在Spark Streaming程序里面獲取這些信息:

1 val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

這樣我們就可以獲取所以分區消費信息,只需要遍歷offsetsList,然后將這些信息發送到Zookeeper即可更新Kafka消費的偏移量。完整的代碼片段如下:

01 val messages =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
02       messages.foreachRDD(rdd => {
03         val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
04         val kc = new KafkaCluster(kafkaParams)
05         for (offsets < - offsetsList) {
06           val topicAndPartition = TopicAndPartition("iteblog", offsets.partition)
07           val =kc.setConsumerOffsets(args(0), Map((topicAndPartition, offsets.untilOffset)))
08           if (o.isLeft) {
09             println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
10           }
11         }
12 })

  KafkaCluster類用於建立和Kafka集群的鏈接相關的操作工具類,我們可以對Kafka中Topic的每個分區設置其相應的偏移量Map((topicAndPartition, offsets.untilOffset)),然后調用KafkaCluster類的setConsumerOffsets方法去更新Zookeeper里面的信息,這樣我們就可以更新Kafka的偏移量,最后我們就可以通過KafkaOffsetMonitor之類軟件去監控Kafka中相應Topic的消費信息,下圖是KafkaOffsetMonitor的監控情況:



  從圖中我們可以看到KafkaOffsetMonitor監控軟件已經可以監控到Kafka相關分區的消費情況,這對監控我們整個Spark Streaming程序來非常重要,因為我們可以任意時刻了解Spark讀取速度。另外,KafkaCluster工具類的完整代碼如下:

01 package org.apache.spark.streaming.kafka
02  
03 import kafka.api.OffsetCommitRequest
04 import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
05 import kafka.consumer.SimpleConsumer
06 import org.apache.spark.SparkException
07 import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
08  
09 import scala.collection.mutable.ArrayBuffer
10 import scala.util.Random
11 import scala.util.control.NonFatal
12  
13 /**
14  * User: 過往記憶
15  * Date: 2015-06-02
16  * Time: 下午23:46
17  * bolg: http://www.iteblog.com
18  * 本文地址:http://www.iteblog.com/archives/1381
19  * 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
20  * 過往記憶博客微信公共帳號:iteblog_hadoop
21  */
22  
23 class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
24   type Err = ArrayBuffer[Throwable]
25  
26   @transient private var _config: SimpleConsumerConfig = null
27  
28   def config: SimpleConsumerConfig = this.synchronized {
29     if (_config == null) {
30       _config = SimpleConsumerConfig(kafkaParams)
31     }
32     _config
33   }
34  
35   def setConsumerOffsets(groupId: String,
36                          offsets: Map[TopicAndPartition, Long]
37                           ): Either[Err, Map[TopicAndPartition, Short]] = {
38     setConsumerOffsetMetadata(groupId, offsets.map { kv =>
39       kv._1 -> OffsetMetadataAndError(kv._2)
40     })
41   }
42  
43   def setConsumerOffsetMetadata(groupId: String,
44                                 metadata: Map[TopicAndPartition, OffsetMetadataAndError]
45                                  ): Either[Err, Map[TopicAndPartition, Short]] = {
46     var result = Map[TopicAndPartition, Short]()
47     val req = OffsetCommitRequest(groupId, metadata)
48     val errs = new Err
49     val topicAndPartitions = metadata.keySet
50     withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
51       val resp = consumer.commitOffsets(req)
52       val respMap = resp.requestInfo
53       val needed = topicAndPartitions.diff(result.keySet)
54       needed.foreach { tp: TopicAndPartition =>
55         respMap.get(tp).foreach { err: Short =>
56           if (err == ErrorMapping.NoError) {
57             result += tp -> err
58           else {
59             errs.append(ErrorMapping.exceptionFor(err))
60           }
61         }
62       }
63       if (result.keys.size == topicAndPartitions.size) {
64         return Right(result)
65       }
66     }
67     val missing = topicAndPartitions.diff(result.keySet)
68     errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
69     Left(errs)
70   }
71  
72   private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
73                          (fn: SimpleConsumer => Any): Unit = {
74     brokers.foreach { hp =>
75       var consumer: SimpleConsumer = null
76       try {
77         consumer = connect(hp._1, hp._2)
78         fn(consumer)
79       catch {
80         case NonFatal(e) =>
81           errs.append(e)
82       finally {
83         if (consumer != null) {
84           consumer.close()
85         }
86       }
87     }
88   }
89  
90   def connect(host: String, port: Int): SimpleConsumer =
91     new SimpleConsumer(host, port, config.socketTimeoutMs,
92       config.socketReceiveBufferBytes, config.clientId)
93 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM