Spark相對於Hadoop MapReduce有一個很顯著的特性就是“迭代計算”(作為一個MapReduce的忠實粉絲,能這樣說,大家都懂了吧),這在我們的業務場景里真的是非常有用。
假設我們有一個文本文件“datas”,每一行有三列數據,以“\t”分隔,模擬生成文件的代碼如下:

執行該代碼之后,文本文件會存儲於本地路徑:/tmp/datas,它包含1000行測試數據,將其上傳至我們的測試Hadoop集群,路徑:/user/yurun/datas,命令如下:

查詢一下它的狀態:

我們通過Spark SQL API將其注冊為一張表,代碼如下:

表的名稱為source,它有三列,列名分別為:col1、col2、col3,類型都為字符串(str),測試打印其前10行數據:

假設我們的分析需求如下:
(1)過濾條件:col1 = ‘col1_50',以col2為分組,求col3的最大值;
(2)過濾條件:col1 = 'col1_50',以col3為分組,求col2的最小值;
注意:需求是不是很變態,再次注意我們只是模擬。
通過情況下我們可以這么做:

每一個collect()(Action)都會產生一個Spark Job,

因為這兩個需求的處理邏輯是類似的,它們都有兩個Stage:


可以看出這兩個Job的數據輸入量是一致的,根據輸入量的具體數值,我們可以推斷出這兩個Job都是直接從原始數據(文本文件)計算的。
這種情況在Hive(MapReduce)的世界里是很難優化的,處理邏輯雖然簡單,卻無法使用一條SQL語句表述(有的是因為分析邏輯復雜,有的則因為各個處理邏輯的結果需要獨立存儲),只能一個需求對應一(多)條SQL語句(如上示例),帶來的問題就是全量原始數據多次被分析,在海量數據的場景下必然帶來集群資源的巨大浪費。
其實這兩個需求有一個共同點:過濾條件相同(col1 = 'col1_50'),一個很自然的想法就是將滿足過濾條件的數據緩存,然后在緩存數據之上執行計算,Spark為我們做到了這一點。

依然是兩個Job,每個Job仍然是兩個Stage,但這兩個Stage的輸入數據量(Input)已發生變化:


Job1的Input(數據輸入量)仍然是63.5KB,是因為“cacheTable”僅僅在RDD(cacheRDD)第一次被觸發計算並執行完成之后才會生效,因此Job1的Input是63.5KB;而Job2執行時“cacheTable”已生效,直接輸入緩存中的數據即可,因此Job2的Input減少為3.4KB,而且因為所需緩存的數據量小,可以完全被緩存於內存中,因此效率極高。
我們也可以從Spark相關頁面中確認“cache”確實生效:

我們也需要注意cacheTable與uncacheTable的使用時機,cacheTable主要用於緩存中間表結果,它的特點是少量數據且被后續計算(SQL)頻繁使用;如果中間表結果使用完畢,我們應該立即使用uncacheTable釋放緩存空間,用於緩存其它數據(示例中注釋uncacheTable操作,是為了頁面中可以清楚看到表被緩存的效果)。