spark 為什么要用broadcast[轉]


為什么要用broadcast?

If you have huge array that is accessed from Spark Closures, for example some reference data, this array will be shipped to each spark node with closure. For example if you have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node).

If you use broadcast it will be distributed once per node using efficient p2p protocol.

val array: Array[Int] = ??? // some huge array
val broadcasted = sc.broadcast(array)

And some RDD

val rdd: RDD[Int] = ???

In this case array will be shipped with closure each time

rdd.map(i => array.contains(i))

and with broadcast you'll get huge performance benefit

rdd.map(i => broadcasted.value.contains(i))

變量不broadcast僅僅影響的是效率嗎?

理解閉包
      Spark中理解起來比較困難的一點是當代碼在集群上運行時變量和方法的生命周期和作用域(scope)。當作用於RDD上的操作修改了超出它們作用域范圍的變量時,會引起一些混淆。為了說明這個問題,使用下面的例子。該例中使用foreach(),對counter(計數器)進行增加,相同的問題也會發生在其他操作中。

 

例子

      下面的例子在以本地模式運行(--master = local[n]) 和將它部署到集群中 (例如通過 spark-submit 提交到 YARN)對比發現會產生不同的結果。

1
2
3
4
5
 
var counter = 0
var rdd = sc.parallelize(data)
// 錯誤,請不要這樣做!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)

 

本地模式 vs. 集群模式

      這里主要的挑戰是上面代碼的行為是有歧義的。以本地模式運行在單個JVM上,上面的代碼會將RDD中的值進行累加,並且將它存儲到counter中。這是因為RDD和變量counter在driver節點的相同內存空間中。
      然而,以集群模式運行時,會更加復雜,上面的代碼的結果也許不會如我們預期的那樣。當執行一個作業(job)時,Spark會將RDD分成多個任務(task)--每一個任務都會由一個executor來執行。在執行之前,Spark會計算閉包(closure)。閉包是對executors可見的那部分變量和方法,executors會用閉包來執行RDD上的計算(在這個例子中,閉包是foreach())。這個閉包是被序列化的,並且發送給每個executor。在本地模式中,只有一個executor,所以共享相同的閉包。然而,在集群模式中,就不是這樣了。executors會運行在各自的worker節點中,每個executor都有閉包的一個復本。
      發送給每個executor的閉包中的變量其實也是復本。每個foreach函數中引用的counter不再是driver節點上的counter。當然,在driver節點的內存中仍然存在這一個counter,但是這個counter對於executors來說是不可見的。executors只能看到自己的閉包中的復本。這樣,counter最后的值仍舊是0,因為所有在counter的操作只引用了序列化閉包中的值。
      為了在這樣的場景中,確保這些行為正確,應該使用累加變量(Accumulator)。在集群中跨節點工作時,Spark中的累加變量提供了一種安全的機制來更新變量。所以可變的全局狀態應該使用累加變量來定義。

所以上面的例子可以這樣寫:

1
2
3
4
5
 
// counter現在是累加變量
var counter = sc.accumulator(0)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter += x)
println("Counter value: " + counter)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM