spark-streaming-連接kafka的兩種方式


推薦系統的在線部分往往使用spark-streaming實現,這是一個很重要的環節。

在線流程的實時數據一般是從kafka獲取消息到spark streaming

spark連接kafka兩種方式在面試中會經常被問到,說明這是重點,下面為大家介紹一下這兩種方法:

 

第一種方式:Receiver模式 又稱kafka高級api模式

效果:SparkStreaming中的Receivers,恰好kafka有發布、訂閱,然而:這種方式企業不常用,說明有bug,不符合企業需求。因為:接收到的數據存儲在Executor,會出現數據漏處理或者多處理狀況。

簡單的理解就是kafka把消息全部封裝好,提供給spark去調用,本來kafka的消息分布在不同的partition上面,相當於做了一步數據合並,在發送給spark,故spark可以設置executor個數去消費這部分數據,效率相對慢一些。

代碼實例:

object ReceiverKafkaWordCount {
 Logger.getLogger("org").setLevel(Level.ERROR)
 def main(args: Array[String]): Unit = {
   val Array(brokers, topics) = Array(Conf.KAFKA_BROKER, Conf.TEST_TOPIC)
   // Create context with 2 second batch interval
   val conf = new SparkConf()
     .setMaster("local")
     .setAppName("OnlineStreamHobby") //設置本程序名稱
//      .set("auto.offset.reset","smallest")
   val ssc = new StreamingContext(conf, Seconds(2))
   //    從kafka取數據
   val kafkaParams: Map[String, String] = Map[String, String](
//      "auto.offset.reset" -> "smallest", //自動將偏移重置為最早的偏移
           "zookeeper.connect" -> Conf.ZK_HOST,
//      "bootstrap.servers" -> Common.KAFKA_BROKER_LIST,
     "group.id" -> "test"
   )
   val numThreads = 1
   val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
   val fact_streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2).map(_._2)
//    fact_streaming.print()
   val words = fact_streaming.flatMap(_.split(" "))
   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
   wordCounts.print()
   ssc.checkpoint(".")
   //啟動spark並設置執行時間
   ssc.start()
   ssc.awaitTermination()
 }
}

第二種方式:Direct模式 又稱kafka低級API模式

效果:每次到topic的每個partition依據偏移量進行獲取數據,拉取數據以后進行處理,可以實現高可用

解釋:在spark 1.3中引入了這種新的無接收器“直接”方法,以確保更強大的端到端保證。這種方法不是使用接收器來接收數據,而是定期查詢kafka在每個topic+分partition中的最新偏移量,並相應地定義要在每個批次中處理的偏移量范圍。當處理數據的作業啟動時,Kafka簡單的客戶API用於讀取Kafka中定義的偏移范圍(類似於從文件系統讀取文件)。請注意,此功能在Spark 1.3中為Scala和Java API引入

簡單的理解就是spark直接從kafka底層中的partition直接獲取消息,相對於Receiver模式少了一步,效率更快。但是這樣一來spark中的executor的工作的個數就為kafka中的partition一致,設置再多的executor都不工作,同時偏移量也需要自己維護。

代碼示例:

object DirectTest {
 def main(args: Array[String]) {
   val conf = new SparkConf().setAppName("kafka direct test").setMaster("local")
   val sc = new SparkContext(conf)
   val ssc = new StreamingContext(sc,Seconds(10))
   //kafka基本參數,yourBrokers你的brokers集群
   val kafkaParams = Map("metadata.broker.list" -> Conf.KAFKA_BROKER)
   val topic = "test"
   val customGroup = "testGroup"
   //新建一個zkClient,zk是你的zk集群,和broker一樣,也是"IP:端口,IP端口..."
   /**
     *如果你使用val zkClient = new ZKClient(zk)新建zk客戶端,
     *在后邊讀取分區信息的文件數據時可能會出現錯誤
     *org.I0Itec.zkclient.exception.ZkMarshallingError:
     *  java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) ..
     *那么使用我的這個新建方法就可以了,指定讀取數據時的序列化方式
     **/
   val zkClient = new ZkClient(Conf.ZK_HOST, Integer.MAX_VALUE, 10000,ZKStringSerializer)
   //獲取zk下該消費者的offset存儲路徑,一般該路徑是/consumers/test_spark_streaming_group/offsets/topic_name
   val topicDirs = new ZKGroupTopicDirs(customGroup, topic)
   val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")
   //設置第一批數據讀取的起始位置
   var fromOffsets: Map[TopicAndPartition, Long] = Map()
   var directKafkaStream : InputDStream[(String,String)] = null
   //如果zk下有該消費者的offset信息,則從zk下保存的offset位置開始讀取,否則從最新的數據開始讀取(受auto.offset.reset設置影響,此處默認)
   if (children > 0) {
     //將zk下保存的該主題該消費者的每個分區的offset值添加到fromOffsets中
     for (i <- 0 until children) {
       val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/$i")
       val tp = TopicAndPartition(topic, i)
       //將不同 partition 對應的 offset 增加到 fromOffsets 中
       fromOffsets += (tp -> partitionOffset.toLong)
       println("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
       val messageHandler = (mmd: MessageAndMetadata[String, String]) =>  (mmd.topic,mmd.message())
       directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](ssc, kafkaParams, fromOffsets, messageHandler)
     }
   }else{
     directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
   }
   /**
     *上邊已經實現從zk上保存的值開始讀取數據
     *下邊就是數據處理后,再講offset值寫會到zk上
     */
   //用於保存當前offset范圍
   var offsetRanges: Array[OffsetRange]  = Array.empty
   val directKafkaStream1 = directKafkaStream.transform { rdd =>
     //取出該批數據的offset值
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     rdd
   }.map(_._2)
   directKafkaStream1.foreachRDD(rdd=>{
     //數據處理完畢后,將offset值更新到zk集群
     for (o <- offsetRanges) {
       val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
       ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)
     }
     rdd.foreach(println)
   })
   ssc.start()
   ssc.awaitTermination()
 }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM