SparkStreaming整合Kafka(Offset保存在zookeeper上,Spark2.X + kafka0.10.X)


先來一段到處都有的原理(出處到處都有,就不注明了)

Streaming和Kafka整合有兩種方式--Receiver和Direct,簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據

Receiver:
1、Kafka中topic的partition與Spark中RDD的partition是沒有關系的,因此,在KafkaUtils.createStream()中,提高partition的數量,只會增加Receiver的數量,也就是讀取Kafka中topic partition的線程數量,不會增加Spark處理數據的並行度。
2、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver並行接收數據。
3、如果基於容錯的文件系統,比如HDFS,啟用了預寫日志機制,接收到的數據都會被復制一份到預寫日志中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
Direct:
1、簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream,然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2、高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
3、一次且僅一次的事務機制:基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。由於數據消費偏移量是保存在checkpoint中,因此,如果后續想使用kafka高級API消費數據,需要手動的更新zookeeper中的偏移量
 
本來說網上搜一搜就解決了代碼問題,但是大部分都是Spark1.X 的,對應的Kafka的 createDirectStream 的傳參方式不一樣(也可能是我學的太淺),所以仿照spark1.X的寫了個。
 
接下來直接上代碼
package com.kafka

import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object KafkaZookeeperCheckPoint {


  // ZK client
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("bigdata:2181,bigdata:2182,bigdata:2183")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("mykafka")
      .build()
    client.start()
    client
  }

  // offset 路徑起始位置
  val Globe_kafkaOffsetPath = "/kafka/offsets"

  // 路徑確認函數  確認ZK中路徑存在,不存在則創建該路徑
  def ensureZKPathExists(path: String)={

    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }

  }


  // 保存 新的 offset
  def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {

    for (o <- offsetRange){
      val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"

      // 向對應分區第一次寫入或者更新Offset 信息
      println("---Offset寫入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
      client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
    }
  }

  def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = {

    // Kafka 0.8和0.10的版本差別,0.10 為 TopicPartition   0.8 TopicAndPartition
    var fromOffset: Map[TopicPartition, Long] = Map()

    val topic1 = topic(0).toString

    // 讀取ZK中保存的Offset,作為Dstrem的起始位置。如果沒有則創建該路徑,並從 0 開始Dstream
    val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"

    // 檢查路徑是否存在
    ensureZKPathExists(zkTopicPath)

    // 獲取topic的子節點,即 分區
    val childrens = client.getChildren().forPath(zkTopicPath)

    // 遍歷分區
    val offSets: mutable.Buffer[(TopicPartition, Long)] = for {
      p <- childrens
    }
      yield {

        // 遍歷讀取子節點中的數據:即 offset
        val offsetData = client.getData().forPath(s"$zkTopicPath/$p")
        // 將offset轉為Long
        val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
        // 返回  (TopicPartition, Long)
        (new TopicPartition(topic1, Integer.parseInt(p)), offSet)
      }
    println(offSets.toMap)

    if(offSets.isEmpty){
      (offSets.toMap, 0)
    } else {
      (offSets.toMap, 1)
    }


  }

//    if (client.checkExists().forPath(zkTopicPath) == null){
//
//      (null, 0)
//    }
//    else {
//      val data = client.getData.forPath(zkTopicPath)
//      println("----------offset info")
//      println(data)
//      println(data(0))
//      println(data(1))
//      val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong)
//      println(offSets)
//      (offSets, 1)
//    }
//
//  }

  def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String],
                                         groupName:String ):InputDStream[ConsumerRecord[String, String]] = {

    // get offset  flag = 1  表示基於已有的offset計算  flag = 表示從頭開始(最早或者最新,根據Kafka配置)
    val (fromOffsets, flag) = getFromOffset(topic, groupName)
    var kafkaStream:InputDStream[ConsumerRecord[String, String]] = null
    if (flag == 1){
      // 加上消息頭
      //val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
      println(fromOffsets)
      kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))
println(fromOffsets)
      println("中斷后 Streaming 成功!")

    } else {
      kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams))

      println("首次 Streaming 成功!")

    }
    kafkaStream
  }

  def main(args: Array[String]): Unit = {

    val processInterval = 5
    val brokers = "bigdata:9092,bigdata:9093,bigdata:9094"
    val topics = Array("zkKafka")
    val conf = new SparkConf().setMaster("local[2]").setAppName("kafka checkpoint zookeeper")
    // kafka params
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "zk_group",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val ssc = new StreamingContext(conf, Seconds(processInterval))

    val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group")

      messages.foreachRDD((rdd) => {
        if (!rdd.isEmpty()){

          println("###################:"+rdd.count())
        }

        // 存儲新的offset
        storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group")
      })

    ssc.start()
    ssc.awaitTermination()

  }

}

 

遇到的幾個坑:

一、怎么傳入 Offset,這個糾結了好久。后面是看 createDirectStream的源代碼看到  consumerStrategy.Subscribe  這個方法還有第三個參數,就是Offset

 

二、無法遍歷topic的子節點(分區)

 

解決方案:

import scala.collection.JavaConversions._

這個是因為個人水平問題,不多說

 

三、TopicPartition和TopicAndPartition

spark2.X貌似只有TopicPartition了,這個方法也是看源碼找到的,搜一下就有,就不截圖了。

然后,實例化的時候,因為我的Topic是Array,在實例化TopicPartition的時候,需要先toString。不能TopicPartition(topic.toString, ....  ,在里面toString會造成,實例化后的topic顯示為亂碼(這塊不熟,諒解)。

 

如有問題,歡迎指正~

下次將Offset寫入到Hbase的代碼和遇到的坑寫出來

最后,源碼真的是個好東西~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM