Spark Streaming fileStream實現原理


fileStream是Spark Streaming Basic Source的一種,用於“近實時”地分析HDFS(或者與HDFS API兼容的文件系統)指定目錄(假設:dataDirectory)中新近寫入的文件,dataDirectory中的文件需要滿足以下約束條件:
 
(1)這些文件格式必須相同,如:統一為文本文件;
(2)這些文件在目錄dataDirectory中的創建形式比較特殊:必須以原子方式被“移動”或“重命名”至目錄dataDirectory中;
(3)一旦文件被“移動”或“重命名”至目錄dataDirectory中,文件不可以被改變,例如:追加至這些文件的數據可能不會被處理。
 
之所以稱之為“近實時”就是基於約束條件(2),文件的數據必須全部寫入完成,並且被“移動”或“重命名”至目錄dataDirectory中之后,這些文件才可以被處理。
 
調用示例如下:
 
 
directory:指定待分析文件的目錄;
filter:用戶指定的文件過濾器,用於過濾directory中的文件;
newFilesOnly:應用程序啟動時,目錄directory中可能已經存在一些文件,如果newFilesOnly值為true,表示忽略這些文件;如果newFilesOnly值為false,表示需要分析這些文件;
conf:用戶指定的Hadoop相關的配置屬性;
 
注:fileStream有另外兩個重載方法,在此不再贅述。
 
如果分析的文件是文本文件,Spark提供了一個便利的方法:
 
 
fileStream的實現原理是比較簡單的:以固定的時間間隔(duration)不斷地探測目錄(dataDirectory),每次探測時將時間段(now - duration, now]內新寫入的文件(即文件的最近修改時間處於時間區間(now - duration, now])封裝為RDD交由Spark處理。
 
Spark Streaming有一個核心組件:DStream,fileStream的實現依賴於其中的一個實現類:FileInputDStream。
 
 
而FileInputDStream的核心邏輯就是探測文件、封裝RDD,由方法compute(重寫至DStream compute)實現,
 
 
compute方法的注釋引出了一個很重要的問題:我們為什么需要維護一個最近已分析文件的列表?
 
假設探測目錄為dataDirectory,探測時間間隔為duration,當前時間為now,則本次選擇的文件需要滿足條件:文件的最近修改時間需要處於區間(now - duration, now],此時文件可能有以下幾個狀態:
 
(1)文件的最后修改時間小於或等於now - duration;
(2)文件的最后修改時間處於區間(now - duration, now);
(3)文件的最后修改時間等於now;
(4)文件的最后修改時間大於now;
 
考慮第(3)種情況,文件的最后修改時間等於now,它有可能在探測之前已被移動至目錄dataDirectory,或者在探測時或探測完成之后被移動至目錄dataDirectory;如果是后兩者,就可能會出現文件“丟失”的情況(即文件不被處理),因為下次探測的時間點為now + duration,探測的時間范圍為(now, now + duration],最近修改時間等於now的文件已不處於該區間。為了避免或減少文件“丟失”的情況,所以Spark Streaming fileStream允許將探測的時間范圍向“前”擴展為(now - n * duration, now],如下所示:
 
 
ignore threshold:now - n * duration
current batch time:now
remember window:n * duration
 
也就是說,每一次探測時,我們會選擇文件的最后修改時間處於區間(ignore threshold, current batch time]的文件,但其中有些文件可能在前幾次的探測中已經被分析,為了防止出現重復分析的情況,我們需要記錄時間區間(ignore threshold, current batch time](remember window)內已經被分析過的文件有哪些。
 
下面我們來分析compute的處理流程:
 
1. 尋找新的文件;
 
 
 
(1)計算ignore threshold;
 
這一步有兩個重要的變量需要說明:initialModTimeIgnoreThreshold和durationToRemember。
 
initialModTimeIgnoreThreshold
 
 
它的值與newFilesOnly有關,newFilesOnly表示Spark Streaming App剛剛啟動時是否分析目錄dataDirectory中已有的文件:
 
newFilesOnly == true:不需要分析目錄dataDirectory中已有的文件,因此initialModTimeIgnoreThreshold的值被設置為“當前時間”,表示僅僅分析最近修改時間大於“當前時間”的文件;
 
newFilesOnly == false:需要分析目錄dataDirectory中已有的文件,因此initialModTimeIgnoreThreshold的值被設置為0(文件的最近修改時間必大於0)。
 
durationToRemember
 
 
slideDuration:表示探測的時間間隔。
 
 
minRememberDurationS:默認值為60s,可以通過屬性spark.streaming.fileStream.minRememberDuration進行修改。
 
 
 
通過上面的代碼可以看出,durationToRemember = slideDuration * math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt,durationToRemember就是我們前面提到的remember window,也就是說這個時間區間內的已分析文件會被記錄。
 
 
ignore threshold取initialModTimeIgnoreThreshold、currentTime - durationToRemember.milliseconds的最大值,這也意味着即使newFilesOnly值為false,dataDirectory中的文件也不會被全部分析,只有最近修改時間大於currentTime - durationToRemember.milliseconds的文件才會被分析。
 
(2)創建過濾器實例;
 
 
過濾器實例實際就是Hadoop PathFilter實例,依賴於方法isNewFile構建,顧名思義這個過濾器實例是用來選取新文件的,新文件的標准需要滿足以下四個條件:
 
 
a. 文件路徑匹配用戶指定的過濾器實例;
b. 文件的最近修改時間大於modTimeIgnoreThreshold;
c. 文件的最近修改時間小於或等於currentTime;
d. 文件尚沒有被分析過,即文件沒有出現在最近已分析文件的列表recentlySelectedFiles。
 
這里需要額外說明一下c,為什么文件的最近修改時間不能大於currentTime?這主要是為了防止Spark Streaming應用重啟時出現文件被重復分析的情況。
 
假設應用的終止時間為time,重啟時間為time + 5 * duration,recentlySelectedFiles僅保存最近一個duration已經被分析過的文件,即保存的時間窗口為duration;應用重啟之后,第一次探測的時間為time + duration,如果允許文件的最近修改時間大於currentTime(即time + duration),則最近修改時間處於時間區間(time, +∞)的文件將全部被分析,這些文件被記入recentlySelectedFiles;第二次探測的時間為time + 2 * duration,因為recentlySelectedFiles的時間窗口為duration,此時可以認為它的值已經被清空,如果允許文件的最近修改時間大於currentTime(即time + 2 * duration),則最近修改時間處於時間區間(time + 2 * duration, +∞)的文件將全部被分析,這種情況下可以看出最近修改時間處於時間區間(time + 2 * duration, +∞)的文件被重復分析;此外探測時間為time + 3 * duration、time + 4 * duration、time + 5 * duration也將出現類似文件被重復分析的情況。綜上所述,每次探測文件時,文件的最近修改時間不能大於currentTime。
 
(3)獲取滿足過濾器實例條件的文件路徑;
 
 
至此,尋找“新”文件的流程結束。
 
2. 將找到的新文件加入已分析文件列表;
 
 
recentlySelectedFiles中的過期數據是由方法clearMetadata負責清理的。
 
3. 將找到的新文件封裝為RDD;
 
 
 
(1)遍歷新文件(路徑),將每一個新文件(路徑)通過SparkContext newAPIHadoopFile轉換為一個RDD,最后形成一個RDD列表:fileRDDs;
(2)將fileRDDs轉換為一個UnionRDD並返回;
 
至此,compute的整個處理流程結束。可以看出,整個流程中最為復雜的部分就是每次探測新文件的過程,特別是時間區間的選取以及最近已分析文件的緩存。
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM