Spark消費kafka的直連方式


 spark消費kafka的兩種方式

直連方式的兩種

自動和手動

 自動

自動偏移量維護kafka 0.10 之前的版本是維護在zookeeper中的,kafka0.10以后的版本是維護在kafka中的topic中的

 

查看記錄消費者的偏移量的路徑 _consumer_offsets
 
        

 案例:

注:先啟動zookeeper 再啟動kafka集群

命令:

zkServer.sh start

./kafka-server-start.sh -daemon ../config/server.properties

如下圖:

package com.bw.streaming.day03
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
//直連方式
//自定記錄偏移量
object RedirectWithAutoOffser {
  def main(args: Array[String]): Unit = {
    //入口
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafkaWithDirect")
    val ssc = new StreamingContext(conf,Seconds(2))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "linux04:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "gg1803",
      //如果沒有記錄偏移量,就消費最新的數據
      "auto.offset.reset" -> "earliest",
      //spark 消費kafka中的偏移量自動維護: kafka 0.10之前的版本自動維護在zookeeper  kafka 0.10之后偏移量自動維護topic(__consumer_offsets)
     //開啟自己動維護偏移量
    "enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("t1807a1") //直連方式 val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)) stream.map(cr => cr.value()).print() //啟動 ssc.start() ssc.awaitTermination() } }

結果: 不僅將原來topic中的數據拉取出來 還將消費的數據也拉取粗來了

這里斷開程序

然后再開始運行程序

結果如下: 證明是自己記錄了偏移量,從上次讀到的數據開始拉取

 

 

 

手動記錄偏移量

案例

 

package com.bw.streaming.day03

import kafka.utils.ZKGroupTopicDirs
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
  * 直連方式手動維護偏移量
  */
object Spa1 {
  def main(args: Array[String]): Unit = {
    //入口
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingKafkaWithDirect")
    val ssc = new StreamingContext(conf,Seconds(2))
    //消費者組的名稱
    val gname = "gg18033";
    //kafka中topic名稱
    val tname = "t1807a1"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "linux04:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> gname,
      //如果沒有記錄偏移量,就消費最新的數據
      "auto.offset.reset" -> "latest",
      //spark 消費kafka中的偏移量自動維護: kafka 0.10之前的版本自動維護在zookeeper  kafka 0.10之后偏移量自動維護topic(__consumer_offsets)
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array(tname)
    //指定zk的地址,后期更新消費的偏移量時使用(以后可以使用Redis、MySQL來記錄偏移量)
    val zkQuorum = "linux04:2181,linux05:2181,linux06:2181"
    //創建一個 ZKGroupTopicDirs 對象,其實是指定往zk中寫入數據的目錄,用於保存偏移量 /gg1803/offsets/test/1
    val topicDirs = new ZKGroupTopicDirs(gname,tname)
    //獲取 zookeeper 中的路徑 "/gg1803/offsets/test/"
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
    //是zookeeper的客戶端,可以從zk中讀取偏移量數據,並更新偏移量
    val zkClient = new ZkClient(zkQuorum)
    //查詢該路徑下是否字節點(默認有字節點為我們自己保存不同 partition 時生成的)
    // /gg1803/offsets/test/0/10001"
    // /gg1803/offsets/test/1/30001"
    // /gg1803/offsets/test/2/10001"
    //讀取 "/gg1803/offsets/test/"有沒有子節點,返回的子節點的個數
    val children = zkClient.countChildren(zkTopicPath)
    //直連方式
    var stream: InputDStream[ConsumerRecord[String, String]] = null
    if(children == 0){
      //程序第一次啟動
      stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
    }else{
      //手動維護過偏移量
      //1.先將維護的偏移量讀取出來(zookeeper redis mysql)
      var offsets: collection.mutable.Map[TopicPartition, Long] = collection.mutable.Map[TopicPartition, Long]()
      for (i <- 0 until children) {
        //                                               path = "/gg1803/offsets/test/0"
        val partitionOffset = zkClient.readData[Long](s"$zkTopicPath/${i}")
        // wordcount/0
        val tp = new TopicPartition(tname, i)
        //將不同 partition 對應的 offset 增加到 fromOffsets 中
        // wordcount/0 -> 10001
        offsets.put(tp,partitionOffset.toLong)
      }
      stream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,offsets))
    }
    //記錄偏移量
    stream.foreachRDD(rdd =>{
      //轉換rdd為帶偏移量的rdd
      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //業務處理
      rdd.foreach(println(_))
      //記錄偏移量
      for(osr <- ranges){
        //println(osr.topic +" " + osr.partition +" " + osr.fromOffset +" " + osr.untilOffset )
        //  /g001/offsets/wordcount/0
        val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
        //將該 partition 的 offset 保存到 zookeeper
        //  /g001/offsets/test/0/20000
        //如果目錄不存在先創建
        //println(zkPath)
        if(!zkClient.exists(zkPath)){
          zkClient.createPersistent(zkPath,true)
        }
        //寫入數據
        zkClient.writeData(zkPath,osr.untilOffset)
      }
    })
    //啟動
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 

 

 

結果如下:

 這個是正確的講解:截圖上面的筆記不要看

1. 之前在zk節點中是沒有消費者組,

2.程序運行一次 將消費者組 消費記錄放入zk節點中

3.將程序關閉,

4.然后再將程序運行 

5.在kafka生產數據

6.查看控制台  打印的消費數據

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM