理解Spark里的閉包


閉包的概念如下圖:

640?wx_fmt=png

在spark應用里,變量及函數的作用范圍和聲明周期在spark的集群運行模式下是比較難理解的,尤其是對初學者來說。RDD的操作,要修改其作用范圍的變量,經常會出點叉子。下面,可以舉個用foreach,修改一個計數器的例子。

例子

求和RDD元素的例子,該例子會根據該段代碼是否執行在同一個jvm里面有不同的輸出結果,比如local模式,運行於同一個jvm,輸出是15;cluster模式運行於不同jvm輸出是0。

val data=Array(1, 2, 3, 4, 5)

var counter = 0

var rdd = sc.parallelize(data)

 

// Wrong: Don't do this!!

rdd.foreach(x => counter += x)

 

println("Counter value: " + counter)

本地或集群模式

上述代碼的行為是未定義的,並且不同模式下運行情況不同。為了執行作業,Spark將RDD操作的處理分解為tasks,每個task由Executor執行。在執行之前,Spark會計算task的閉包。閉包是Executor在RDD上進行計算的時候必須可見的那些變量和方法(在這種情況下是foreach())。閉包會被序列化並發送給每個Executor。

 

發送給每個Executor的閉包中的變量是副本,因此,當foreach函數內引用計數器時,它不再是driver節點上的計數器。driver節點的內存中仍有一個計數器,但該變量是Executor不可見的!執行者只能看到序列化閉包的副本。因此,計數器的最終值仍然為零,因為計數器上的所有操作都引用了序列化閉包內的值。

 

在本地模式下,在某些情況下,該foreach函數實際上將在與driver相同的JVM內執行,並且會引用相同的原始計數器,並可能實際更新它。

為了確保在這些場景中明確定義的行為,應該使用一個Accumulator。Spark中的累加器專門用於提供一種機制,用於在集群中的工作節點之間執行拆分時安全地更新變量。

一般來說,closures - constructs像循環或本地定義的方法,不應該被用來改變一些全局狀態。Spark並沒有定義或保證從閉包外引用的對象的改變行為。這樣做的一些代碼可以在本地模式下工作,但這只是偶然,並且這種代碼在分布式模式下的行為不會像你想的那樣。如果需要某些全局聚合,請改用累加器。

打印RDD的元素

另一個常見的習慣用法是嘗試使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。在單台機器上,這將產生預期的輸出並打印所有RDD的元素。但是,在cluster模式下,由Executor執行輸出寫入的是Executor的stdout,而不是driver上的那個stdout,所以driver的stdout不會顯示這些!要在driver中打印所有元素,可以使用該collect()方法首先將RDD數據帶到driver節點:rdd.collect().foreach(println)。但這可能會導致driver程序內存不足,因為collect()會將整個RDD數據提取到driver端; 如果您只需要打印RDD的一些元素,則更安全的方法是使用take():張曉麗rdd.take(100).foreach(println)。

推薦閱讀:

1,必讀:Spark與kafka010整合

2,論Spark Streaming的數據可靠性和一致性

3,Spark技術學院-進去能學到啥?

與bat大牛一起學習進步,請關注知識星球:spark技術學院

640?wx_fmt=jpeg


文章來源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/80650309


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM