使用場景
Spark Streaming實時消費kafka數據的時候,程序停止或者Kafka節點掛掉會導致數據丟失,Spark Streaming也沒有設置CheckPoint(據說比較雞肋,雖然可以保存Direct方式的offset,但是可能會導致頻繁寫HDFS占用IO),所以每次出現問題的時候,重啟程序,而程序的消費方式是Direct,所以在程序down掉的這段時間Kafka上的數據是消費不到的,雖然可以設置offset為smallest,但是會導致重復消費,重新overwrite hive上的數據,但是不允許重復消費的場景就不能這樣做。
原理闡述
在Spark Streaming中消費 Kafka 數據的時候,有兩種方式分別是 :
1.基於 Receiver-based 的 createStream 方法。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。本文對此方式不研究,有興趣的可以自己實現,個人不喜歡這個方式。KafkaUtils.createStream
2.Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二種使用方式中 kafka 的 offset 是保存在 checkpoint 中的,如果程序重啟的話,會丟失一部分數據,我使用的是這種方式。KafkaUtils.createDirectStream。本文將用代碼說明如何將 kafka 中的 offset 保存到 zookeeper 中,以及如何從 zookeeper 中讀取已存在的 offset。
代碼
廢話不說,直接貼代碼。
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
val conf: Conf = new config.Conf("test-util.conf") val zkHost = conf.getString("kafka.zookeeper.connect") val brokerList=conf.getString("kafka.metadata.broker.list") val zkClient = new ZkClient(zkHost) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList, "zookeeper.connect" -> zkHost, "group.id" -> "testid") var kafkaStream: InputDStream[(String, String)] = null var offsetRanges = Array[OffsetRange]() val sc=SparkUtil.createSparkContext("test") val ssc=new StreamingContext(sc,Seconds(5)) val topic="TEST_TOPIC" val topicDirs = new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming_testid", topic) //創建一個 ZKGroupTopicDirs 對象,對保存 val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查詢該路徑下是否字節點(默認有字節點為我們自己保存不同 partition 時生成的) var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我們會利用這個 offset 作為 kafkaStream 的起始位置 if (children > 0) { //如果保存過 offset,這里更好的做法,還應該和 kafka 上最小的 offset 做對比,不然會報 OutOfRange 的錯誤 for (i <- 0 until children) { val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}") val tp = TopicAndPartition(topic, i) fromOffsets += (tp -> partitionOffset.toLong) //將不同 partition 對應的 offset 增加到 fromOffsets 中 } val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //這個會將 kafka 的消息進行 transform,最終 kafak 的數據都會變成 (topic_name, message) 這樣的 tuple kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("TEST_TOPIC")) //如果未保存,根據 kafkaParam 的配置使用最新或者最舊的 offset } kafkaStream.transform{rdd=> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到該 rdd 對應 kafka 的消息的 offset rdd }.map(_._2).foreachRDD(rdd=>{ for (o <- offsetRanges) { val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) //將該 partition 的 offset 保存到 zookeeper } rdd.foreach(s=>println(s)) }) ssc.start() ssc.awaitTermination()
總結
樓主實現了保存一個topic的offset到zk,但是如果Spark Streaming同時消費多個topic的方式及topicSet里有多個topic,樓主還沒有想到解決辦法,歡迎指正。
轉:http://www.cnblogs.com/ChouYarn/p/6235823.html