本文主要是講解Spark Streaming與kafka結合的新增分區檢測的問題。讀本文前關於kafka與Spark Streaming結合問題請參考下面兩篇文章:
1,必讀:再講Spark與kafka 0.8.2.1+整合
2,必讀:Spark與kafka010整合
讀本文前是需要了解Spark Streaming的原理和源碼結構基礎。
Spark Streaming源碼系列視頻教程請點閱讀原文進入浪尖的知識星球:Spark技術學院。
kafka 0.8版本
進入正題,之所以會有今天題目的疑惑,是由於在08版本kafka和Spark Streaming結合的DirectStream這種形式的API里面,是不支持kafka新增分區或者topic檢測的。而這個問題,對於很多業務增長比較明顯的公司都是會有碰到相應的問題。
比如,原來的公司業務增長比較明顯,那么kafka吞吐量,剛開始創建的topic數目和分區數目可能滿足不了並發需求,需要增加分區。新增加的分區會有生產者往里面寫數據,而Spark Streaming跟kafka 0.8版本結合的API是滿足不了動態發現kafka新增topic或者分區的需求的。
這么說有什么依據嗎?我們做項目不能人雲亦雲,所以我們可以從源碼入手驗證我們的想法。
我們在這里不會詳細講Spark Streaming源碼,但是我們可以在這里思考一下,Spark Streaming分區檢測是在哪做的?
很明顯對於批處理的Spark Streaming任務來說,分區檢測應該在每次job生成獲取kafkaRDD,來給kafkaRDD確定分區數並且每個分區賦值offset范圍的時候有牽扯,而這段代碼就在DirectKafkaInputDStream#compute方法中。(看過浪尖Spark Streaming源碼視頻教程的肯定會知道)
那么我們就貼出這塊源碼去驗證我們的想法,首先compute方法的第一行:
val untilOffsets=clamp(latestLeaderOffsets(maxRetries))
這里面獲取的是當前生成KafkaRDD每個分區消費的offset的最大值,那么我們需要進入latestLeaderOffsets進一步去看,可以發現下面一行代碼:
val o=kc.getLatestLeaderOffsets(currentOffsets.keySet)
這個是根據currentOffsets信息來獲取最大的offset,由此此處繼續深入發現,劍橋少兒英語由於它只是根據currentOffsets信息來獲取最大的offset,沒有去感知新增的分區,所以Spark Streaming與kafka 0.8結合是不能動態感知分區的。
kafka 0.10版本
相似的我們也可以直接去看kafka 0.10這塊的源碼去檢查,他是否會動態生成kafka分區。
進入DirectKafkaInputDStream的compute,看到的第一行代碼也是:
val untilOffsets=clamp(latestOffsets())
在latestOffsets里面,有了新的大陸:
到這里本文就算結束了,kafka 0.10版本與SparkStreaming結合支持新增分區檢測,這件事告訴我們沒事要多看源碼,增長見識。
有收獲就點個贊吧。
文章來源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81117367