Spark Streaming消費Kafka Direct方式數據零丟失實現


 

使用場景

Spark Streaming實時消費kafka數據的時候,程序停止或者Kafka節點掛掉會導致數據丟失,Spark Streaming也沒有設置CheckPoint(據說比較雞肋,雖然可以保存Direct方式的offset,但是可能會導致頻繁寫HDFS占用IO),所以每次出現問題的時候,重啟程序,而程序的消費方式是Direct,所以在程序down掉的這段時間Kafka上的數據是消費不到的,雖然可以設置offset為smallest,但是會導致重復消費,重新overwrite hive上的數據,但是不允許重復消費的場景就不能這樣做。

原理闡述

在Spark Streaming中消費 Kafka 數據的時候,有兩種方式分別是 :

1.基於 Receiver-based 的 createStream 方法。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。本文對此方式不研究,有興趣的可以自己實現,個人不喜歡這個方式。KafkaUtils.createStream

2.Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二種使用方式中  kafka 的 offset 是保存在 checkpoint 中的,如果程序重啟的話,會丟失一部分數據,我使用的是這種方式。KafkaUtils.createDirectStream。本文將用代碼說明如何將 kafka 中的 offset 保存到 zookeeper 中,以及如何從 zookeeper 中讀取已存在的 offset。

代碼

廢話不說,直接貼代碼。

復制代碼
 
           
  import kafka.common.TopicAndPartition
  import kafka.message.MessageAndMetadata
  import kafka.serializer.StringDecoder
  import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
  import org.I0Itec.zkclient.ZkClient
  import org.apache.spark.streaming.{Seconds, StreamingContext}
  import org.apache.spark.streaming.dstream.InputDStream
  import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

   val conf: Conf = new config.Conf("test-util.conf") val zkHost = conf.getString("kafka.zookeeper.connect") val brokerList=conf.getString("kafka.metadata.broker.list") val zkClient = new ZkClient(zkHost) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList, "zookeeper.connect" -> zkHost, "group.id" -> "testid") var kafkaStream: InputDStream[(String, String)] = null var offsetRanges = Array[OffsetRange]() val sc=SparkUtil.createSparkContext("test") val ssc=new StreamingContext(sc,Seconds(5)) val topic="TEST_TOPIC" val topicDirs = new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming_testid", topic) //創建一個 ZKGroupTopicDirs 對象,對保存 val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查詢該路徑下是否字節點(默認有字節點為我們自己保存不同 partition 時生成的) var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我們會利用這個 offset 作為 kafkaStream 的起始位置 if (children > 0) { //如果保存過 offset,這里更好的做法,還應該和 kafka 上最小的 offset 做對比,不然會報 OutOfRange 的錯誤 for (i <- 0 until children) { val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}") val tp = TopicAndPartition(topic, i) fromOffsets += (tp -> partitionOffset.toLong) //將不同 partition 對應的 offset 增加到 fromOffsets 中 } val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //這個會將 kafka 的消息進行 transform,最終 kafak 的數據都會變成 (topic_name, message) 這樣的 tuple kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("TEST_TOPIC")) //如果未保存,根據 kafkaParam 的配置使用最新或者最舊的 offset } kafkaStream.transform{rdd=> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到該 rdd 對應 kafka 的消息的 offset rdd }.map(_._2).foreachRDD(rdd=>{ for (o <- offsetRanges) { val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) //將該 partition 的 offset 保存到 zookeeper } rdd.foreach(s=>println(s)) }) ssc.start() ssc.awaitTermination()
復制代碼

總結

樓主實現了保存一個topic的offset到zk,但是如果Spark Streaming同時消費多個topic的方式及topicSet里有多個topic,樓主還沒有想到解決辦法,歡迎指正。

轉:http://www.cnblogs.com/ChouYarn/p/6235823.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM