官網:http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#understanding-closures-
Spark中一個非常難以理解的概念,就是在集群中分布式並行運行時操作的算子外部的變量的生命周期
通常來說,這個問題跟在RDD的算子中操作作用域外部的變量有關
所謂RDD算子中,操作作用域外部的變量,指的是,類似下面的語句:
int counter = 0; JavaRDD<Integer> rdd = sc.parallelize(data); // Wrong: Don't do this!! rdd.foreach(x -> counter += x); println("Counter value: " + counter);
此時,對rdd執行的foreach算子的作用域,其實僅僅是它的內部代碼,但是這里卻操作了作用域外部的a變量
根據不同的編程語言的語法,這種功能是可以做到的,而這種現象就叫做閉包
閉包簡單來說,就是操作的不屬於一個作用域范圍的變量
如果使用local模式運行spark作業,那么實際只有一個jvm進程在執行這個作業
此時,你所有的RDD算子的代碼執行以及它們操作的外部變量,都是在一個進程的內存中,這個進程就是driver進程
此時是沒有任何問題的
但是在作業提交到集群執行的模式下(無論是client或cluster模式,作業都是在集群中運行的)
為了分布式並行執行你的作業,spark會將你的RDD算子操作,分散成多個task,放到集群中的多個節點上的executor進程中去執行
每個task執行的是相同的代碼,但是卻是處理不同的數據
在提交作業的task到集群去執行之前,spark會先在driver端處理閉包
spark中的閉包,特指那些,不在算子的作用域內部,但是在作用域外部卻被算子處理和操作了的變量
而算子代碼的執行也需要這些變量才能順利執行
此時,這些閉包變量會被序列化成多個副本,然后每個副本都發送到各個executor進程中,供那個executor進程運行的task執行代碼時使用
對於上面說的閉包變量處理機制
對於local模式,沒有任何特別的影響,畢竟都在一個jvm進程中,變量發送到executor,也不過就是進程中的一個線程而已
但是對於集群運行模式來說,每個executor進程,都會得到一個閉包變量的副本,這個時候,就會出問題
因此閉包變量發送到executor進程中之后,就變成了一個一個獨立的變量副本了,這就是最關鍵的一點
此時在executor進程中,執行task和算子代碼時,訪問的閉包變量,也僅僅只是當前executor進程中的一個變量副本而已了
此時雖然在driver進程中,也有一個變量副本,但是卻完全跟各個executor進程中的變量副本不是一個東西
此時,各個executor進程對於自己內存中的變量副本進行操作,即使改變了變量副本的值,但是對於driver端的程序,是完全感知不到的
driver端的變量沒有被進行任何操作
因此綜上所述,在你使用集群模式運行作業的時候,切忌不要在算子內部,對作用域外面的閉包變量進行改變其值的操作
因為那沒有任何意義,算子僅僅會在executor進程中,改變變量副本的值
對於driver端的變量沒有任何影響,我們也獲取不到executor端的變量副本的值
如果希望在集群模式下,對某個driver端的變量,進行分布式並行地全局性的修改
可以使用Spark提供的Accumulator,全局累加器