Spark 自定義累加變量(Accmulator)AccumulatorParam


1.創建一個累加變量

public <T> Accumulator<T> accumulator(T initialValue,
                             AccumulatorParam<T> param)
Create an Accumulator variable of a given type, which tasks can "add" values to using the += method. Only the driver can access the accumulator's value.
Parameters:
initialValue - (undocumented)
param - (undocumented)
Returns:
(undocumented)

使用SparkContext的如上方法,可以創建一個累加變量。默認情況下,這里的T是int或者double,因此如果想要創建T為long的累加變量是不行的。

 

2.AccumulatorParam介紹

  概念:

   initialValue:Accumulator的初始值,也就是調用SparkContext.accululator時傳遞的initialValue

   zeroValue:AccumulatorParam的初始值,也就是zero方法的返回值。

 

假設樣本數據集合為simple={1,2,3,4}

執行順序:

1.調用zero(initialValue),返回zeroValue

2.調用addAccumulator(zeroValue,1) 返回v1.

   調用addAccumulator(v1,2)返回v2.

   調用addAccumulator(v2,3)返回v3.

   調用addAccumulator(v3,4)返回v4.

3.調用addInPlace(initialValue,v4)

 

因此最終結果是zeroValue+1+2+3+4+initialValue.

 

3.實現AccumulatorParam

import org.apache.spark.AccumulatorParam;

public class LongAccumulator implements AccumulatorParam<Long>{

        //執行完addAccumulator方法之后,最后會執行這個方法,將value加到init。
        @Override
        public Long addInPlace(Long init, Long value) {
            // TODO Auto-generated method stub
            // return arg0+arg1;
            System.out.println(init+":"+value);
            return init+value;
        }

        
        /*
         * init 就是SparkContext.accumulator(init)參數init。 
         * 這里的返回值是累計的起始值。注意哦,他可以不等於init。
         *
         * 如果init=10,zero(init)=0,那么運算過程如下:
         * v1:=0+step
         * v1:=v1+step
         * ...
         * ...
         * 最后v1:=v1+init
         **/
        @Override
        public Long zero(Long init) {
            // TODO Auto-generated method stub
            System.out.println(init);
            return 0l;
        }

        @Override
        public Long addAccumulator(Long value, Long step) {
            // TODO Auto-generated method stub
            System.out.println(value+","+step);
            return value+step;
        }

        
    }

 

接下來使用它。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

public class AccumulatorDemo {
    public static void main(String[]args){
        SparkConf conf=new SparkConf().setAppName("AccumulatorDemo").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        
        Accumulator<Long> acc=sc.accumulator(0L,new LongAccumulator());

        List<Long> seq=Arrays.asList(1L,2L,3L,4L);
        JavaRDD<Long> rdd=sc.parallelize(seq);
        
        
        
        rdd.foreach(new VoidFunction<Long>(){

            @Override
            public void call(Long arg0) throws Exception {
                acc.add(arg0);
            }
            
        });
        
        System.out.println(acc.value());;
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM