broadcast
官方文檔描述:
Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. The variable will be sent to each cluster only once.
函數原型:
def broadcast[T](value: T): Broadcast[T]
廣播變量允許程序員將一個只讀的變量緩存在每台機器上,而不用在任務之間傳遞變量。廣播變量可被用於有效地給每個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減少通信的開銷。 Spark的動作通過一系列的步驟執行,這些步驟由分布式的洗牌操作分開。Spark自動地廣播每個步驟每個任務需要的通用數據。這些廣播數據被序列化地緩存,在運行任務之前被反序列化出來。這意味着當我們需要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地創建廣播變量才有用。
源碼分析:
def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) { // This is a warning instead of an exception in order to avoid breaking user programs that // might have created RDD broadcast variables but not used them: logWarning("Can not directly broadcast RDDs; instead, call collect() and " + "broadcast the result (see SPARK-5063)") } val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
實例:
1 List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); 2 JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5); 3 final Broadcast<List<Integer>> broadcast = javaSparkContext.broadcast(data); 4 JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() { 5 List<Integer> iList = broadcast.value(); 6 @Override 7 public Integer call(Integer v1) throws Exception { 8 Integer isum = 0; 9 for(Integer i : iList) 10 isum += i; 11 return v1 + isum; 12 } 13 }); 14 System.out.println(result.collect());
accumulator
官方文檔描述:
Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
values to using the `add` method. Only the master can access the accumulator's `value`.
函數原型:
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T]) : Accumulator[T]
累加器是僅僅被相關操作累加的變量,因此可以在並行中被有效地支持。它可以被用來實現計數器和sum。Spark原生地只支持數字類型的累加器,開發者可以添加新類型的支持。如果創建累加器時指定了名字,可以在Spark的UI界面看到。這有利於理解每個執行階段的進程(對於Python還不支持) 。
累加器通過對一個初始化了的變量v調用SparkContext.accumulator(v)來創建。在集群上運行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程序能夠讀取它的值,通過累加器的value方法。
累加器通過對一個初始化了的變量v調用SparkContext.accumulator(v)來創建。在集群上運行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程序能夠讀取它的值,通過累加器的value方法。
源碼分析:
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) : Accumulator[T] = { val acc = new Accumulator(initialValue, param, Some(name)) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc }
實例:
1 class VectorAccumulatorParam implements AccumulatorParam<Vector> { 2 @Override 3 //合並兩個累加器的值。 4 //參數r1是一個累加數據集合 5 //參數r2是另一個累加數據集合 6 public Vector addInPlace(Vector r1, Vector r2) { 7 r1.addAll(r2); 8 return r1; 9 } 10 @Override 11 //初始值 12 public Vector zero(Vector initialValue) { 13 return initialValue; 14 } 15 @Override 16 //添加額外的數據到累加值中 17 //參數t1是當前累加器的值 18 //參數t2是被添加到累加器的值 19 public Vector addAccumulator(Vector t1, Vector t2) { 20 t1.addAll(t2); 21 return t1; 22 } 23 } 24 List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); 25 JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5); 26 27 final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0); 28 Vector initialValue = new Vector(); 29 for(int i=6;i<9;i++) 30 initialValue.add(i); 31 //自定義累加器 32 final Accumulator accumulator1 = javaSparkContext.accumulator(initialValue,new VectorAccumulatorParam()); 33 JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() { 34 @Override 35 public Integer call(Integer v1) throws Exception { 36 accumulator.add(1); 37 Vector term = new Vector(); 38 term.add(v1); 39 accumulator1.add(term); 40 return v1; 41 } 42 }); 43 System.out.println(result.collect()); 44 System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator.value()); 45 System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator1.value());
原文引自:https://www.jianshu.com/p/082ef79c63c1?utm_campaign=maleskine&utm_content=note&utm_medium=pc_all_hots&utm_source=recommendation