轉http://hadoop1989.com/2016/03/15/KafkaStreaming/
在Spark1.3之前,默認的Spark接收Kafka數據的方式是基於Receiver
的,在這之后的版本里,推出了Direct Approach
,現在整理一下兩種方式的異同。
1. Receiver-based Approach
示例代碼:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
2. Direct Approach (No Receivers)
示例代碼:
import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
源碼實現
1、 KafkaUtils.createStream
首先從源碼層面來看,其主要調用棧順序:
KafkaUtils.createStream--->createStream--->new KafkaInputDStream--->new KafkaReceiver
KafkaReceiver
類繼承了Receiver
,當Reciver
被調用起來時,執行onStart()
方法,MessageHandler
負責將收到的數據進行存儲。執行流程如下:
- 創建
createStream
,Receiver
被調起執行 - 連接
ZooKeeper
,讀取相應的Consumer
、Topic
配置信息等 - 通過
consumerConnector
連接到Kafka
集群,收取指定topic
的數據 - 創建
KafkaMessageHandler
線程池來對數據進行處理,通過ReceiverInputDStream
中的方法,將數據轉換成BlockRDD
,供后續計算
2、 KafkaUtils.createDirectStream
主要調用棧順序:
KafkaUtils.createDirectStream—> new DirectKafkaInputDStream
執行流程如下:
- 實例化
KafkaCluster
,根據用戶配置的Kafka
參數,連接Kafka
集群 - 通過
Kafka API
讀取Topic
中每個Partition
最后一次讀的Offset
- 接收成功的數據,直接轉換成
KafkaRDD
,供后續計算
架構
通過兩張圖,來看下他們架構。
1、 Receiver-based Approach
2、 Direct Approach (No Receivers)
優缺點
相關的優缺點,在官網上已經說得很清楚了。追求效率、數據准確可以使用Direct
方式,但需要自己對Offset
進行處理。
參考資料:
Spark Streaming + Kafka Integration Guide
https://github.com/koeninger/kafka-exactly-once