先來一段到處都有的原理(出處到處都有,就不注明了)
Streaming和Kafka整合有兩種方式--Receiver和Direct,簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據
Receiver:
1、Kafka中topic的partition與Spark中RDD的partition是沒有關系的,因此,在KafkaUtils.createStream()中,提高partition的數量,只會增加Receiver的數量,也就是讀取Kafka中topic partition的線程數量,不會增加Spark處理數據的並行度。
2、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver並行接收數據。
3、如果基於容錯的文件系統,比如HDFS,啟用了預寫日志機制,接收到的數據都會被復制一份到預寫日志中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
Direct:
1、簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream,然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2、高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
3、一次且僅一次的事務機制:基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。由於數據消費偏移量是保存在checkpoint中,因此,如果后續想使用kafka高級API消費數據,需要手動的更新zookeeper中的偏移量
本來說網上搜一搜就解決了代碼問題,但是大部分都是Spark1.X 的,對應的Kafka的 createDirectStream 的傳參方式不一樣(也可能是我學的太淺),所以仿照spark1.X的寫了個。
接下來直接上代碼
package com.kafka import scala.collection.JavaConversions._ import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object KafkaZookeeperCheckPoint { // ZK client val client = { val client = CuratorFrameworkFactory .builder .connectString("bigdata:2181,bigdata:2182,bigdata:2183") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("mykafka") .build() client.start() client } // offset 路徑起始位置 val Globe_kafkaOffsetPath = "/kafka/offsets" // 路徑確認函數 確認ZK中路徑存在,不存在則創建該路徑 def ensureZKPathExists(path: String)={ if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } // 保存 新的 offset def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = { for (o <- offsetRange){ val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}" // 向對應分區第一次寫入或者更新Offset 信息 println("---Offset寫入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset) client.setData().forPath(zkPath, o.untilOffset.toString.getBytes()) } } def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = { // Kafka 0.8和0.10的版本差別,0.10 為 TopicPartition 0.8 TopicAndPartition var fromOffset: Map[TopicPartition, Long] = Map() val topic1 = topic(0).toString // 讀取ZK中保存的Offset,作為Dstrem的起始位置。如果沒有則創建該路徑,並從 0 開始Dstream val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}" // 檢查路徑是否存在 ensureZKPathExists(zkTopicPath) // 獲取topic的子節點,即 分區 val childrens = client.getChildren().forPath(zkTopicPath) // 遍歷分區 val offSets: mutable.Buffer[(TopicPartition, Long)] = for { p <- childrens } yield { // 遍歷讀取子節點中的數據:即 offset val offsetData = client.getData().forPath(s"$zkTopicPath/$p") // 將offset轉為Long val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong // 返回 (TopicPartition, Long) (new TopicPartition(topic1, Integer.parseInt(p)), offSet) } println(offSets.toMap) if(offSets.isEmpty){ (offSets.toMap, 0) } else { (offSets.toMap, 1) } } // if (client.checkExists().forPath(zkTopicPath) == null){ // // (null, 0) // } // else { // val data = client.getData.forPath(zkTopicPath) // println("----------offset info") // println(data) // println(data(0)) // println(data(1)) // val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong) // println(offSets) // (offSets, 1) // } // // } def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String], groupName:String ):InputDStream[ConsumerRecord[String, String]] = { // get offset flag = 1 表示基於已有的offset計算 flag = 表示從頭開始(最早或者最新,根據Kafka配置) val (fromOffsets, flag) = getFromOffset(topic, groupName) var kafkaStream:InputDStream[ConsumerRecord[String, String]] = null if (flag == 1){ // 加上消息頭 //val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) println(fromOffsets) kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets)) println(fromOffsets) println("中斷后 Streaming 成功!") } else { kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topic, kafkaParams)) println("首次 Streaming 成功!") } kafkaStream } def main(args: Array[String]): Unit = { val processInterval = 5 val brokers = "bigdata:9092,bigdata:9093,bigdata:9094" val topics = Array("zkKafka") val conf = new SparkConf().setMaster("local[2]").setAppName("kafka checkpoint zookeeper") // kafka params val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "zk_group", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val ssc = new StreamingContext(conf, Seconds(processInterval)) val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group") messages.foreachRDD((rdd) => { if (!rdd.isEmpty()){ println("###################:"+rdd.count()) } // 存儲新的offset storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group") }) ssc.start() ssc.awaitTermination() } }
遇到的幾個坑:
一、怎么傳入 Offset,這個糾結了好久。后面是看 createDirectStream的源代碼看到 consumerStrategy.Subscribe 這個方法還有第三個參數,就是Offset
二、無法遍歷topic的子節點(分區)
解決方案:
import scala.collection.JavaConversions._
這個是因為個人水平問題,不多說
三、TopicPartition和TopicAndPartition
spark2.X貌似只有TopicPartition了,這個方法也是看源碼找到的,搜一下就有,就不截圖了。
然后,實例化的時候,因為我的Topic是Array,在實例化TopicPartition的時候,需要先toString。不能TopicPartition(topic.toString, .... ,在里面toString會造成,實例化后的topic顯示為亂碼(這塊不熟,諒解)。
如有問題,歡迎指正~
下次將Offset寫入到Hbase的代碼和遇到的坑寫出來
最后,源碼真的是個好東西~