Spark——傳遞函數與閉包


        在Scala中,你可以在任何作用於內定義函數,在函數體內,可以訪問相應作用域內的任何變量;還不止,你的函數還可以在變量不再處於作用於內的時候被調用,這就是閉包的最基本的理解。

一、transform、action算子的函數參數

        在spark集群中,spark應用由負責運行用戶編寫的main函數,以及在集群上運行的各種並行操作的驅動器程序(Driver)和並行運行在集群各節點的工作進程(Executor)共同組成。action算子會觸發spark提交job,在提交job的過程中,transform算子和action算子中的func會被封裝成閉包,然后發送到各個worker節點上去執行(數據就近原則)。

        顯然,閉包是有狀態的,主要表現為那些自由變量,以及自由變量依賴到的其他變量,所以,在將一個簡單的函數或者一段代碼片段傳遞給算子作為參數前,spark會檢測閉包內所有涉及的變量,然后序列化變量,再傳給worker節點,再反序列化執行。(檢測——序列化——傳遞變量——反序列化)

函數參數表示為:

val f:(Double)=>Double = 2*_

f的類型是(Double)=> Double,傳入一個Double類型參數,返回一個Double類型的值。spark的transform和action算子都用到了函數參數,這其中閉包的運用最頻繁。

val f(x:Int) = (x:Int) => 2*x
val rdd = sc.parallelize(1 to 10)
val rdd1 = rdd.map(x => f(x))

結果rdd1的值為Array(2,4,6,8,10,12,14,16,18,20),這似乎沒有涉及到什么閉包的知識點,不要着急,這里先介紹transform、action算子是怎樣調用函數參數的。

二、閉包的理解

def mulBy (factor : Double) = (x:Double) => factor * x
val triple = mulBy(3)
val half = mulBy(0.5)
println(s"${triple(14)}, ${half(14)}")

         定義了一個函數mulBy,類型是 Double,值為(x:Double) => factor * x;

        首先,mulBy的首次被調用,將參數3傳給(x:Double) => factor * x,factor=3,該變量在mulBy被引用,並將函數參數存入triple。然后參數變量factor從運行時的棧上被彈出;

        然后,mulBy再次被調用,factor的值被設置為0.5,同樣的,新的參數函數存入half中,參數變量factor從運行時的棧上被彈出;

        因為每次調用mulBy函數后,都將其值存入到一個變量中(如上面的triple和half),當使用triple函數或half函數時factor相當於是作用域外,這就是“閉包”,閉包由代碼和代碼用到的任何非局部變量定義構成。因此,輸出結果為:42,7

        雖然表象上triple和half的調用,仍然使用factor變量,但可以理解為,triple和half函數的factor並不是一個變量,而是真實的、不變的一個常量值3和0.5。

三、閉包進一步理解:spark本地模式 VS 集群模式

        通過上面可以理解spark算子怎樣遍歷調用一個函數,函數涉及的變量如何到達worker節點,以及閉包的概念。當一個集群上執行代碼時,變量和方法的范圍以及生命周期,是spark比較難理解的地方。

var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach(x=>counter += x)
println(s"Counter value : $counter")

        對於單純的RDD元素總和,根據是否運行在同一個虛擬機上,他們表現的行為完全不同。

        在本地模式下,在某些情況下,驅動程序會運行在同一個JVM內,即各個程序操作的counter屬於同一個,從而可以得到“預期”的RDD元素總和結果。

        在集群模式下,為了執行作業,spark將RDD分拆成多個task,每個task由一個執行器(Executor,即一個task只能被一個Executor消化,一個Executor可以消化多個task)執行操作。在執行前,spark計算閉包(檢測閉包變量和方法,上述代碼指的是counter和foreach),這個閉包會被序列化,並分發給每一個執行器。換句話說,每個執行器得到各自的counter,對counter進行修改時,也只是修改自己的counter,而驅動器(Driver)上的counter並沒有被修改,所以最終的counter輸出結果沒有達到預期,輸出為0。這個可以理解為Driver的counter變量是全局變量,Executor的counter是局部變量。

        所以Spark為了應對這種由於閉包產生的影響,支持定義使用全局共享變量,廣播(broadcast)變量,用來將一個值緩存到所有節點的內存中。對於累加操作,還可以使用累加器(accumulator)。

var accum = sc.accumulator(0)
val value = sc.parallelize(Array(1,2,3,4)).foreach(x => accum+=x).value
println(s"accum = $accum")
//accum的輸出結果為10

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM