spark消費kafka的兩種方式
直連方式的兩種
自動和手動
自動

自動偏移量維護kafka 0.10 之前的版本是維護在zookeeper中的,kafka0.10以后的版本是維護在kafka中的topic中的
查看記錄消費者的偏移量的路徑 _consumer_offsets
案例:
注:先啟動zookeeper 再啟動kafka集群
命令:
zkServer.sh start
./kafka-server-start.sh -daemon ../config/server.properties
如下圖:

package com.bw.streaming.day03 import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} //直連方式 //自定記錄偏移量 object RedirectWithAutoOffser { def main(args: Array[String]): Unit = { //入口 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafkaWithDirect") val ssc = new StreamingContext(conf,Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux04:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "gg1803", //如果沒有記錄偏移量,就消費最新的數據 "auto.offset.reset" -> "earliest", //spark 消費kafka中的偏移量自動維護: kafka 0.10之前的版本自動維護在zookeeper kafka 0.10之后偏移量自動維護topic(__consumer_offsets) //開啟自己動維護偏移量
"enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("t1807a1") //直連方式 val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)) stream.map(cr => cr.value()).print() //啟動 ssc.start() ssc.awaitTermination() } }
結果: 不僅將原來topic中的數據拉取出來 還將消費的數據也拉取粗來了

這里斷開程序
然后再開始運行程序
結果如下: 證明是自己記錄了偏移量,從上次讀到的數據開始拉取


手動記錄偏移量
案例
package com.bw.streaming.day03 import kafka.utils.ZKGroupTopicDirs import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 直連方式手動維護偏移量 */ object Spa1 { def main(args: Array[String]): Unit = { //入口 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafkaWithDirect") val ssc = new StreamingContext(conf,Seconds(2)) //消費者組的名稱 val gname = "gg18033"; //kafka中topic名稱 val tname = "t1807a1" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux04:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> gname, //如果沒有記錄偏移量,就消費最新的數據 "auto.offset.reset" -> "latest", //spark 消費kafka中的偏移量自動維護: kafka 0.10之前的版本自動維護在zookeeper kafka 0.10之后偏移量自動維護topic(__consumer_offsets) "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array(tname) //指定zk的地址,后期更新消費的偏移量時使用(以后可以使用Redis、MySQL來記錄偏移量) val zkQuorum = "linux04:2181,linux05:2181,linux06:2181" //創建一個 ZKGroupTopicDirs 對象,其實是指定往zk中寫入數據的目錄,用於保存偏移量 /gg1803/offsets/test/1 val topicDirs = new ZKGroupTopicDirs(gname,tname) //獲取 zookeeper 中的路徑 "/gg1803/offsets/test/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //是zookeeper的客戶端,可以從zk中讀取偏移量數據,並更新偏移量 val zkClient = new ZkClient(zkQuorum) //查詢該路徑下是否字節點(默認有字節點為我們自己保存不同 partition 時生成的) // /gg1803/offsets/test/0/10001" // /gg1803/offsets/test/1/30001" // /gg1803/offsets/test/2/10001" //讀取 "/gg1803/offsets/test/"有沒有子節點,返回的子節點的個數 val children = zkClient.countChildren(zkTopicPath) //直連方式 var stream: InputDStream[ConsumerRecord[String, String]] = null if(children == 0){ //程序第一次啟動 stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)) }else{ //手動維護過偏移量 //1.先將維護的偏移量讀取出來(zookeeper redis mysql) var offsets: collection.mutable.Map[TopicPartition, Long] = collection.mutable.Map[TopicPartition, Long]() for (i <- 0 until children) { // path = "/gg1803/offsets/test/0" val partitionOffset = zkClient.readData[Long](s"$zkTopicPath/${i}") // wordcount/0 val tp = new TopicPartition(tname, i) //將不同 partition 對應的 offset 增加到 fromOffsets 中 // wordcount/0 -> 10001 offsets.put(tp,partitionOffset.toLong) } stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,offsets)) } //記錄偏移量 stream.foreachRDD(rdd =>{ //轉換rdd為帶偏移量的rdd val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //業務處理 rdd.foreach(println(_)) //記錄偏移量 for(osr <- ranges){ //println(osr.topic +" " + osr.partition +" " + osr.fromOffset +" " + osr.untilOffset ) // /g001/offsets/wordcount/0 val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}" //將該 partition 的 offset 保存到 zookeeper // /g001/offsets/test/0/20000 //如果目錄不存在先創建 //println(zkPath) if(!zkClient.exists(zkPath)){ zkClient.createPersistent(zkPath,true) } //寫入數據 zkClient.writeData(zkPath,osr.untilOffset) } }) //啟動 ssc.start() ssc.awaitTermination() } }
結果如下:

這個是正確的講解:截圖上面的筆記不要看
1. 之前在zk節點中是沒有消費者組,
2.程序運行一次 將消費者組 消費記錄放入zk節點中
3.將程序關閉,
4.然后再將程序運行
5.在kafka生產數據
6.查看控制台 打印的消費數據
