【Spark篇】---Spark中廣播變量和累加器


一、前述

Spark中因為算子中的真正邏輯是發送到Executor中去運行的,所以當Executor中需要引用外部變量時,需要使用廣播變量。

累機器相當於統籌大變量,常用於計數,統計。

二、具體原理

1、廣播變量

  • 廣播變量理解圖

 

 

 

  • 注意事項

1、能不能將一個RDD使用廣播變量廣播出去?

       不能,因為RDD是不存儲數據的。可以將RDD的結果廣播出去。

2、 廣播變量只能在Driver端定義不能在Executor端定義。

3、 Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。

4、如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

5、如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。

 

val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
sc.stop()

 

 2、累加器

 

  • 累加器理解圖

 

 Scala代碼:

import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("accumulator")
    val sc = new SparkContext(conf)
    val accumulator = sc.accumulator(0)
    sc.textFile("./records.txt",2).foreach {//兩個變量
      x =>{accumulator.add(1)
      println(accumulator)}}
    println(accumulator.value)
    sc.stop()
  }
}

 java代碼:

package com.spark.spark.others;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
/**
 * 累加器在Driver端定義賦初始值和讀取,在Executor端累加。
 * @author root
 *
 */
public class AccumulatorOperator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("accumulator");
        JavaSparkContext sc = new JavaSparkContext(conf);
        final Accumulator<Integer> accumulator = sc.accumulator(0);
//        accumulator.setValue(1000);
        sc.textFile("./words.txt",2).foreach(new VoidFunction<String>() {
            
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public void call(String t) throws Exception {
                accumulator.add(1);
//                System.out.println(accumulator.value());
                System.out.println(accumulator);
            }
        });
        System.out.println(accumulator.value());
        sc.stop();
        
    }
}

 

 結果:

 

  • 注意事項

               累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最后的值,在Excutor端更新。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM