關於Spark Streaming感知kafka動態分區的問題


本文主要是講解Spark Streaming與kafka結合的新增分區檢測的問題。讀本文前關於kafka與Spark Streaming結合問題請參考下面兩篇文章:

1,必讀:再講Spark與kafka 0.8.2.1+整合

2,必讀:Spark與kafka010整合

讀本文前是需要了解Spark Streaming的原理和源碼結構基礎。

Spark Streaming源碼系列視頻教程請點閱讀原文進入浪尖的知識星球:Spark技術學院。

kafka 0.8版本

進入正題,之所以會有今天題目的疑惑,是由於在08版本kafka和Spark Streaming結合的DirectStream這種形式的API里面,是不支持kafka新增分區或者topic檢測的。而這個問題,對於很多業務增長比較明顯的公司都是會有碰到相應的問題。

比如,原來的公司業務增長比較明顯,那么kafka吞吐量,剛開始創建的topic數目和分區數目可能滿足不了並發需求,需要增加分區。新增加的分區會有生產者往里面寫數據,而Spark Streaming跟kafka 0.8版本結合的API是滿足不了動態發現kafka新增topic或者分區的需求的。

這么說有什么依據嗎?我們做項目不能人雲亦雲,所以我們可以從源碼入手驗證我們的想法。

我們在這里不會詳細講Spark Streaming源碼,但是我們可以在這里思考一下,Spark Streaming分區檢測是在哪做的?

很明顯對於批處理的Spark Streaming任務來說,分區檢測應該在每次job生成獲取kafkaRDD,來給kafkaRDD確定分區數並且每個分區賦值offset范圍的時候有牽扯,而這段代碼就在DirectKafkaInputDStream#compute方法中。(看過浪尖Spark Streaming源碼視頻教程的肯定會知道)

那么我們就貼出這塊源碼去驗證我們的想法,首先compute方法的第一行:

val untilOffsets=clamp(latestLeaderOffsets(maxRetries))

這里面獲取的是當前生成KafkaRDD每個分區消費的offset的最大值,那么我們需要進入latestLeaderOffsets進一步去看,可以發現下面一行代碼:

val o=kc.getLatestLeaderOffsets(currentOffsets.keySet)

 

這個是根據currentOffsets信息來獲取最大的offset,由此此處繼續深入發現,劍橋少兒英語由於它只是根據currentOffsets信息來獲取最大的offset,沒有去感知新增的分區,所以Spark Streaming與kafka 0.8結合是不能動態感知分區的。

kafka 0.10版本

相似的我們也可以直接去看kafka 0.10這塊的源碼去檢查,他是否會動態生成kafka分區。

進入DirectKafkaInputDStream的compute,看到的第一行代碼也是:

val untilOffsets=clamp(latestOffsets())

在latestOffsets里面,有了新的大陸:

640?wx_fmt=png

到這里本文就算結束了,kafka 0.10版本與SparkStreaming結合支持新增分區檢測,這件事告訴我們沒事要多看源碼,增長見識。

有收獲就點個贊吧。


文章來源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81117367


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM