spark streaming集成kafka接收數據的方式


spark streaming是以batch的方式來消費,strom是准實時一條一條的消費。當然也可以使用trident和tick的方式來實現batch消費(官方叫做mini batch)。效率嘛,有待驗證。不過這兩種方式都是先把數據從kafka中讀取出來,然后緩存在內存或者第三方,再定時處理。如果這時候集群退出,而偏移量又沒處理好的話,數據就丟掉了。

而spark streaming提供了兩種獲取方式,一種是同storm一樣,實時讀取緩存到內存中;另一種是定時批量讀取。

這兩種方式分別是:

  Receiver-base

  Direct

 


下面分別介紹兩種方式的實現

Receiver-base

spark streaming啟動過后,會選擇一台excetor作為ReceiverSupervior

1:Reciver的父級ReciverTracker分發多個job(task)到不同的executor,並啟動ReciverSupervisor.

2:ReceiverSupervior會啟動對應的實例reciver(kafkareciver,TwitterReceiver),並調用onstart()

3:kafkareciver在通過onstart()啟動后就開啟線程源源不斷的接收數據,並交給ReceiverSupervior,通過ReceiverSupervior.store函數一條一條接收

4:ReceiverSupervior會調用BlockGenertor.adddata填充數據。

所有的中間數據都緩存在BlockGenertor

1:首先BlockGenertor維護了一個緩沖區,currentbuffer,一個無限長度的arraybuffer。為了防止內存撐爆,這個currentbuffer的大小可以被限制,通過設置參數spark.streaming.reciver.maxRate,以秒為單位。currentbuffer所使用的內存不是storage(負責spark計算過程中的所有存儲,包括磁盤和內存),而是珍貴的計算內存。所以currentbuffer應該被限制,防止占用過多計算內存,拖慢任務計算效率,甚至有可能拖垮Executor甚至集群。

2:維護blockforpushing隊列,它是等待被拉到到BlockManager的中轉站。它是currentbuffer和BlockManager的中間環節。它里面的每一個元素其實就是一個currentbuffer。

3:維護兩個定時器,其實就是一個生產-消費模式。blockintervaltimer定時器,負責生產端,定時將currentbuffer放進blockforpushing隊列。blockforpushingthread負責消費端,定時將blockforpushing里的數據轉移到BlockManager。

 

Direct

首先這種方式是延遲的。也就是說當action真正觸發時才會去kafka里接數據。因此不存在currentbuffer的概念。它把kafka每個分區里的數據,映射為KafkaRdd的概念。題外話,在structured streaming中,也已經向DataFrame和DataSet統一了,弱化了RDD的概念。

真正與kafka打交道的是KafkaCluster,全限定名: org.apache.spark.streaming.kafka.KafkaCluster。包括設備kafka各種參數,連接,獲取分區,以及偏移量,設置偏移量范圍等。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM