轉http://hadoop1989.com/2016/03/15/KafkaStreaming/
在Spark1.3之前,默認的Spark接收Kafka數據的方式是基於Receiver的,在這之后的版本里,推出了Direct Approach,現在整理一下兩種方式的異同。
1. Receiver-based Approach
示例代碼:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
2. Direct Approach (No Receivers)
示例代碼:
import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
源碼實現
1、 KafkaUtils.createStream
首先從源碼層面來看,其主要調用棧順序:
KafkaUtils.createStream--->createStream--->new KafkaInputDStream--->new KafkaReceiver
KafkaReceiver類繼承了Receiver,當Reciver被調用起來時,執行onStart()方法,MessageHandler負責將收到的數據進行存儲。執行流程如下:
- 創建
createStream,Receiver被調起執行 - 連接
ZooKeeper,讀取相應的Consumer、Topic配置信息等 - 通過
consumerConnector連接到Kafka集群,收取指定topic的數據 - 創建
KafkaMessageHandler線程池來對數據進行處理,通過ReceiverInputDStream中的方法,將數據轉換成BlockRDD,供后續計算
2、 KafkaUtils.createDirectStream
主要調用棧順序:
KafkaUtils.createDirectStream—> new DirectKafkaInputDStream
執行流程如下:
- 實例化
KafkaCluster,根據用戶配置的Kafka參數,連接Kafka集群 - 通過
Kafka API讀取Topic中每個Partition最后一次讀的Offset - 接收成功的數據,直接轉換成
KafkaRDD,供后續計算
架構
通過兩張圖,來看下他們架構。
1、 Receiver-based Approach

2、 Direct Approach (No Receivers)

優缺點
相關的優缺點,在官網上已經說得很清楚了。追求效率、數據准確可以使用Direct方式,但需要自己對Offset進行處理。
參考資料:
Spark Streaming + Kafka Integration Guide
https://github.com/koeninger/kafka-exactly-once
