Spark Streaming揭秘 Day18
空RDD判斷及程序中止機制
空RDD的處理
從API我們可以知道在SparkStreaming中,對於RDD的操作一般都是在foreachRDD和Transform方法里。
在使用foreachRDD時,有一個風險,就是如果RDD為空可能會導致計算失敗,那么應用如何來判斷為空呢?
方法1:使用RDD.count
count方法會直接觸發一個Job,代價有些大

方法2:調用RDD.paritions.isEmpty
我們可以看到partitions是一個方法,返回的是一個數組,那么isEmpty實際上就僅僅是對數組長度的判斷,非常的高效。

這個判斷是否有效?我們進入BlockGenerator,發現如果數據為空時,默認會不生成Block,也就是不會生成partition。所以說,從數據生成機制看,方法2是有效的。

方法3:調用RDD.isEmpty

方法2比較高效,但是也有一個隱患,就是進行shuffle操作,當並行度不變時,可能出現有partition沒數據的情況,會導致方法2判斷失效。
方法3,這個方法是目前來看最完備的方式。首先,對於數組為空的情況,在第一個條件處進行了短路,否則會執行take操作,來實際判斷是否有數,但是take()方法內部還是會有Job生成,所以也會產生一定的效率影響。
Spark Streaming程序的停止
因為SparkStreaming可能由於各種異常情況等發生終止,所以其在設計時,對停止操作提供了一個通用的解決方法。
首先是在啟動時,就注冊了關閉的方法。

從注冊方法的內部,我們看到,實際上是注冊了一個jvm的hook方法,來確保一旦發生異常,關閉方法一定會被調用,從而確保了這個方法的強制執行。

下面讓我們進入關閉方法內部:
這里出現了一個非常重要的參數,就是stopGracefullyOnShutdown,一旦使用stopGracefully,會將所有接收的數據處理完再執行關閉,建議打開,從而確保數據安全性。

最后,在關閉方法的內部,實際上是調用了jobScheduler來完成。

欲知后事如何,且聽下回分解
DT大數據每天晚上20:00YY頻道現場授課頻道68917580
