SparkStreaming從Kafka讀取數據兩種方式


參考文章:http://www.jianshu.com/p/60344796f8a5

在結合 Spark Streaming 及 Kafka 的實時應用中,我們通常使用以下兩個 API 來獲取最初的 DStream(這里不關心這兩個 API 的重載):

KafkaUtils#createDirectStream

KafkaUtils#createStream

這兩個 API 除了要傳入的參數不同外,接收 kafka 數據的節點、拉取數據的時機也完全不同。

本文將分別就兩者進行詳細分析。

一、KafkaUtils#createStream

先來分析 createStream,在該函數中,會新建一個 KafkaInputDStream對象,KafkaInputDStream繼承於 ReceiverInputDStream。我們在文章揭開Spark Streaming神秘面紗② - ReceiverTracker 與數據導入分析過

  1. 繼承ReceiverInputDStream的類需要重載 getReceiver 函數以提供用於接收數據的 receiver
  2. recever 會調度到某個 executor 上並啟動,不間斷的接收數據並將收到的數據交由 ReceiverSupervisor 存成 block 作為 RDD 輸入數據

KafkaInputDStream當然也實現了getReceiver方法,如下:

  def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { //< 不啟用 WAL new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { //< 啟用 WAL new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } }

根據是否啟用 WAL,receiver 分為 KafkaReceiver 和 ReliableKafkaReceiver。揭開Spark Streaming神秘面紗②-ReceiverTracker 與數據導入一文中詳細地介紹了

  1. receiver 是如何被分發啟動的
  2. receiver 接受數據后數據的流轉過程
    並在 揭開Spark Streaming神秘面紗③ - 動態生成 job 一文中詳細介紹了
  3. receiver 接受的數據存儲為 block 后,如何將 blocks 作為 RDD 的輸入數據
  4. 動態生成 job

以上兩篇文章並沒有具體介紹 receiver 是如何接收數據的,當然每個重載了 ReceiverInputDStream 的類的 receiver 接收數據方式都不相同。下圖描述了 KafkaReceiver 接收數據的具體流程:

KafkaUtils#createDirectStream

揭開Spark Streaming神秘面紗③ - 動態生成 job中,介紹了在生成每個 batch 的過程中,會去取這個 batch 對應的 RDD,若未生成該 RDD,則會取該 RDD 對應的 blocks 數據來生成 RDD,最終會調用到DStream#compute(validTime: Time)函數,在KafkaUtils#createDirectStream調用中,會新建DirectKafkaInputDStreamDirectKafkaInputDStream#compute(validTime: Time)會從 kafka 拉取數據並生成 RDD,流程如下:

如上圖所示,該函數主要做了以下三個事情:

  1. 確定要接收的 partitions 的 offsetRange,以作為第2步創建的 RDD 的數據來源
  2. 創建 RDD 並執行 count 操作,使 RDD 真實具有數據
  3. 以 streamId、數據條數,offsetRanges 信息初始化 inputInfo 並添加到 JobScheduler 中

進一步看 KafkaRDD 的 getPartitions 實現:

  override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }

從上面的代碼可以很明顯看到,KafkaRDD 的 partition 數據與 Kafka topic 的某個 partition 的 o.fromOffset 至 o.untilOffset 數據是相對應的,也就是說 KafkaRDD 的 partition 與 Kafka partition 是一一對應的


通過以上分析,我們可以對這兩種方式的區別做一個總結:

  1. createStream會使用 Receiver;而createDirectStream不會
  2. createStream使用的 Receiver 會分發到某個 executor 上去啟動並接受數據;而createDirectStream直接在 driver 上接收數據
  3. createStream使用 Receiver 源源不斷的接收數據並把數據交給 ReceiverSupervisor 處理最終存儲為 blocks 作為 RDD 的輸入,從 kafka 拉取數據與計算消費數據相互獨立;而createDirectStream會在每個 batch 拉取數據並就地消費,到下個 batch 再次拉取消費,周而復始,從 kafka 拉取數據與計算消費數據是連續的,沒有獨立開
  4. createStream中創建的KafkaInputDStream 每個 batch 所對應的 RDD 的 partition 不與 Kafka partition 一一對應;而createDirectStream中創建的 DirectKafkaInputDStream 每個 batch 所對應的 RDD 的 partition 與 Kafka partition 一一對應


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM