Spark累加器


spark累计器

因为task的执行是在多个Executor中执行,所以会出现计算总量的时候,每个Executor只会计算部分数据,不能全局计算。

累计器是可以实现在全局中进行累加计数。

注意:

累加器只能在driver端定义,driver端读取,不能在Executor端读取。

广播变量只能在driver端定义,在Executor端读取,Executor不能修改。

下面是实践的代码

package SparkStreaming; import org.apache.commons.collections.iterators.ArrayListIterator; import org.apache.commons.io.LineIterator; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Iterator; import java.util.List; public class totalization_device { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("totalization_device"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 定义一个累加器 * */ Accumulator<Integer> accumulator = sc.accumulator(0); JavaRDD<String> fileRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt"); JavaRDD<String> fileRDD1 = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { accumulator.add(1); return new ArrayListIterator(s.split(" ")); } }); JavaPairRDD<String, Integer> pairRDD = fileRDD1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); JavaPairRDD<String, Integer> reducebykeyRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); List<Tuple2<String, Integer>> collect = reducebykeyRDD.collect(); for(Tuple2 tup:collect){ System.out.println(tup); } Integer num = accumulator.value(); System.out.println("一共有:"+num+""); sc.close(); } }

结果输出:

19/04/30 15:11:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 41 ms on localhost (executor driver) (1/2) 19/04/30 15:11:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 42 ms on localhost (executor driver) (2/2) 19/04/30 15:11:49 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/04/30 15:11:49 INFO DAGScheduler: ResultStage 1 (collect at totalization_device.java:50) finished in 0.051 s 19/04/30 15:11:49 INFO DAGScheduler: Job 0 finished: collect at totalization_device.java:50, took 0.273877 s (4,1) (authentication,1) (Registered,3) (is,1) (Found,3) (master.Master:,3) (spark.SecurityManager:,5) (util.log:,1) (19,1) (modify,4) (classes,1) (6,1) ([jar:file:/opt/workspace/hive-3.1.0/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class],1) (8080.,1) (type,1) (with,5) (INFO,17) (permissions:,3) (groups,4) (using,1) (19/04/24,18) (Class,1) (@1326ms,1) (WARN,1) (root,2) (signal,3) ('MasterUI',1) (13,1) (24,1) (Set(root);,2) (version,1) (11,1) (ui,1) (8,1) (load,1) (Set();,1) (20,1) (15,1) (14:23:51,14) (Actual,1) (initialized,1) (server.Server:,2) (master,1) (,34) (multiple,1) (56130C,1) (handler,3) (22,1) (26,1) (TERM,1) (17,1) (daemon,1) (bindings.,1) (builtin-java,1) (server.AbstractConnector:,1) (users,2) ([jar:file:/opt/workspace/hbase-1.4.6/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class],1) (http://www.slf4j.org/codes.html#multiple_bindings,1)
(105L,,1) (Starting,1) (jetty-9.3.z-SNAPSHOT,1) (Spark,2) (14,1) (SLF4J,1) (platform...,1) (7,1) (util.NativeCodeLoader:,1) (Successfully,2) (on,2) ('sparkMaster',1) (library,1) (service,2) (16,1) (at,1) (in,3) (9,1) (443265@master1,1) (See,1) (7077.,1) (Logging,1) (missions:,1) (util.Utils:,2) (spark://master1:7077,1)
(for,5) (Changing,4) (25,1) (native-hadoop,1) (port,2) (Running,1) (explanation.,1) (your,1) (view,4) (acls,5) (10,1) (Unable,1) (binding,4) (to:,4) (disabled;,2) (contains,1) (util.SignalUtils:,3) (process,1) (21,1) (SLF4J:,6) (ServerConnector@1cbf22af{HTTP/1.1,[http/1.1]}{0.0.0.0:8080},1) (23,1) (5,1) (18,1) (SecurityManager:,1) (Started,3) (INT,1) (Set(),1) ("spark-root-org.apache.spark.deploy.master.Master-1-master1.out",1) (to,1) (applicable,1) (HUP,1) (started,2) (of,1) (path,1) (where,1) (12,1) (an,1) ([jar:file:/opt/workspace/hadoop-2.9.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class],1) ([org.slf4j.impl.Log4jLoggerFactory],1) (2.3.0,1) (14:23:50,4) (@1280ms,1) (name:,1) (per,1) 一共有:25行 19/04/30 15:11:49 INFO SparkUI: Stopped Spark web UI at http://hadoop:4040
19/04/30 15:11:49 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/04/30 15:11:49 INFO MemoryStore: MemoryStore cleared 19/04/30 15:11:49 INFO BlockManager: BlockManager stopped 19/04/30 15:11:49 INFO BlockManagerMaster: BlockManagerMaster stopped 19/04/30 15:11:49 INFO 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM