spark Accumulator累加器使用示例


官網

http://spark.apache.org/docs/2.3.1/rdd-programming-guide.html#accumulators

http://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.util.AccumulatorV2

Accumulator是spark提供的累加器,累加器的一個常用用途是在調試時對作業執行過程中的事件進行計數,但是只要driver能獲取Accumulator的值(調用value方法), Task只能對其做增加操作,也可以給Accumulator命名(不支持Python),這樣就可以在spark web ui中查看, 可以幫助了解程序運行的情況。

數值累加器可通過調用SparkContext.longAccumulator()或SparkContext,doubleAccumulator()創建,分別用於累加Long或Double類型的值。運行在集群上的任務之后可使用add方法進行累加操作。但是,這些任務並不能讀取累加器的值,只有驅動程序使用value方法能讀取累加器的值。

 

spark內置累加器使用示例

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;
import java.util.HashSet;

/**
 * spark內置了數值類型的累加器,比如LongAccumulator、DoubleAccumulator
 */
public class TestAccumulator {


    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession
                .builder()
                .appName("gxl")
                .master("local")
                .config(conf)
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

        LongAccumulator accum = jsc.sc().longAccumulator();
        JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1, 2, 3));

//        onlyMapOperator(accum,javaRDD);
//        accumInActionOperator(accum,javaRDD);
//        accumExecuteRepeatedly(accum,javaRDD);
    }


    private static void accumInActionOperator(LongAccumulator accum,JavaRDD<Integer> javaRDD){
        javaRDD.foreach(x -> accum.add(x));
        System.out.println(accum.value());
    }

    private static void onlyMapOperator(LongAccumulator accum,JavaRDD<Integer> javaRDD){
        //累加器也是lazy的,只有map操作的算子,累加器不會執行
        javaRDD.map((Function<Integer, Integer>) v1 -> {
            accum.add(v1);
            return v1;
        });
        System.out.println(accum.value());
    }

    private static void accumExecuteRepeatedly(LongAccumulator accum,JavaRDD<Integer> javaRDD){
        JavaRDD<Integer> map = javaRDD.map((Function<Integer, Integer>) v1 -> {
            accum.add(v1);
            return v1;
        });
//        map.count();
//        System.out.println(accum.value());
//        map.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
//        System.out.println(accum.value());

        //將map后的rdd緩存起來
        JavaRDD<Integer> cache = map.cache();
        cache.count();
        System.out.println(accum.value());
        cache.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
        System.out.println(accum.value());
    }

}

 

自定義spark累加器使用示例

import org.apache.spark.util.AccumulatorV2;

/**
 * spark 2.3
 * 自定義累計器需要繼承AccumulatorV2,並且重寫以下方法
 * 將符合條件的數據拼接在一起
 */
public class MyAccumulator extends AccumulatorV2<String,String> {

    private StringBuffer stringBuffer = new StringBuffer();
    /**
     * Returns if this accumulator is zero value or not.
     * 返回該累加器是否為零值。
     * @return
     */
    @Override
    public boolean isZero() {
        return stringBuffer.length() == 0;
    }

    /**
     * Creates a new copy of this accumulator.
     * @return
     */
    @Override
    public AccumulatorV2<String,String> copy() {
        MyAccumulator newMyAccu = new MyAccumulator();
        newMyAccu.stringBuffer.append(stringBuffer);
        return newMyAccu;
    }

    /**
     * Resets this accumulator, which is zero value.
     */
    @Override
    public void reset() {
        stringBuffer.setLength(0);
    }

    /**
     * Takes the inputs and accumulates.
     * @param input
     */
    @Override
    public void add(String input) {
        stringBuffer.append(input).append(",");
    }

    /**
     * Merges another same-type accumulator into this one and update its state, i.e.
     * @param other
     */
    @Override
    public void merge(AccumulatorV2 other) {
        stringBuffer.append(other.value());
    }

    /**
     * Defines the current value of this accumulator
     * @return
     */
    @Override
    public String value() {
        return stringBuffer.toString();
    }
}

 

測試示例

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;
import java.util.HashSet;

/**
 * spark內置了數值類型的累加器,比如LongAccumulator、DoubleAccumulator
 */
public class TestAccumulator {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession
                .builder()
                .appName("gxl")
                .master("local")
                .config(conf)
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
        testMyAccumulator(jsc);
        
    }

    private static void testMyAccumulator(JavaSparkContext jsc){
        MyAccumulator myAccumulator = new MyAccumulator();
        jsc.sc().register(myAccumulator,"myAccumulator");

        HashSet<String> blacklist = new HashSet<>();
        blacklist.add("jack");

        JavaRDD<String> stringJavaRDD = jsc.parallelize(Arrays.asList("jack", "kevin", "wade", "james"));
        JavaRDD<String> filter = stringJavaRDD.filter((Function<String, Boolean>) v1 -> {
            if (blacklist.contains(v1)) {
                return true;
            } else {
                myAccumulator.add(v1);
                return false;
            }
        });
        filter.count();
        System.out.println(myAccumulator.value());
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM