Spark Streaming揭秘 Day18 空RDD判斷及程序中止機制


Spark Streaming揭秘 Day18

空RDD判斷及程序中止機制

空RDD的處理

從API我們可以知道在SparkStreaming中,對於RDD的操作一般都是在foreachRDD和Transform方法里。
在使用foreachRDD時,有一個風險,就是如果RDD為空可能會導致計算失敗,那么應用如何來判斷為空呢?

方法1:使用RDD.count

count方法會直接觸發一個Job,代價有些大
Snip20160601_4

方法2:調用RDD.paritions.isEmpty

我們可以看到partitions是一個方法,返回的是一個數組,那么isEmpty實際上就僅僅是對數組長度的判斷,非常的高效。

Snip20160601_5

這個判斷是否有效?我們進入BlockGenerator,發現如果數據為空時,默認會不生成Block,也就是不會生成partition。所以說,從數據生成機制看,方法2是有效的。
Snip20160602_1

方法3:調用RDD.isEmpty
Snip20160601_1

方法2比較高效,但是也有一個隱患,就是進行shuffle操作,當並行度不變時,可能出現有partition沒數據的情況,會導致方法2判斷失效。
方法3,這個方法是目前來看最完備的方式。首先,對於數組為空的情況,在第一個條件處進行了短路,否則會執行take操作,來實際判斷是否有數,但是take()方法內部還是會有Job生成,所以也會產生一定的效率影響。

Spark Streaming程序的停止

因為SparkStreaming可能由於各種異常情況等發生終止,所以其在設計時,對停止操作提供了一個通用的解決方法。

首先是在啟動時,就注冊了關閉的方法。
Snip20160601_8

從注冊方法的內部,我們看到,實際上是注冊了一個jvm的hook方法,來確保一旦發生異常,關閉方法一定會被調用,從而確保了這個方法的強制執行。
Snip20160601_9

下面讓我們進入關閉方法內部:
這里出現了一個非常重要的參數,就是stopGracefullyOnShutdown,一旦使用stopGracefully,會將所有接收的數據處理完再執行關閉,建議打開,從而確保數據安全性。
Snip20160601_7

最后,在關閉方法的內部,實際上是調用了jobScheduler來完成。
Snip20160601_10

欲知后事如何,且聽下回分解

DT大數據每天晚上20:00YY頻道現場授課頻道68917580


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM