Kakfa揭秘 Day8
DirectKafkaStream代碼解析
今天讓我們進入SparkStreaming,看一下其中重要的Kafka模塊DirectStream的具體實現。
構造Stream
首先,從工廠方法開始,在工廠方法createDirectStream中,有兩類關鍵內容:
- fromOffset:是一個HashMap,指定我們要讀取的topic和Partition。
- Decoder:為什么需要Decoder,是因為Kafka並不對數據有任何的處理,在發送中並不進行解碼,需要在接收端才進行解碼。
Decoder共包含了三個輸入參數,包括KeyDecoderClass,valueDecoderClass和messageHandler。
messageHandler會調用兩個Decoder基於raw message讀出數據。
Decoder的實現可以自行擴展,可以參照下面這個StringDecoder。需要注意的這里並不設置要解碼的內容,所以完全可以應用在圖片處理等領域。
與Kafka集群的交互
讓我們進入DirectKafkaInputDStream,在這里主要構建了KafkaRDD。
其中有一個關鍵點,這里直接構建了KafkaCluster對象,主要向Kafka集群獲取一些元數據。
讓我們進入KafkaRDD,在compute方法中,主要是返回 kafkaRDDIterator。
其中有一次出現了一個kc,這個和前面的不同,這里真正要進行數據處理。會調用SimpleComsumer一次性獲取一批數據。
分區方法
下面也是最為關鍵的部分:
我們可以看到,spark中的分區,是基於offsetRanges來決定的,offsetRanges的實現邏輯如下:是一個集合,讀取過去一段時間產生的新的內容。
我可以看到,就是基於kafka數據來源決定的,也就是說partition是由kafka中的partition決定的,一個kafka的partition 加offset,就對應了RDD中的partition。在實際生產環境中,這段代碼可以優化,更大的利用機器資源提高並行度。
欲知后事如何,且聽下回分解!
DT大數據每天晚上20:00YY頻道現場授課頻道68917580