Spark累加器


spark累計器

因為task的執行是在多個Executor中執行,所以會出現計算總量的時候,每個Executor只會計算部分數據,不能全局計算。

累計器是可以實現在全局中進行累加計數。

注意:

累加器只能在driver端定義,driver端讀取,不能在Executor端讀取。

廣播變量只能在driver端定義,在Executor端讀取,Executor不能修改。

下面是實踐的代碼

package SparkStreaming; import org.apache.commons.collections.iterators.ArrayListIterator; import org.apache.commons.io.LineIterator; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Iterator; import java.util.List; public class totalization_device { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("totalization_device"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 定義一個累加器 * */ Accumulator<Integer> accumulator = sc.accumulator(0); JavaRDD<String> fileRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt"); JavaRDD<String> fileRDD1 = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { accumulator.add(1); return new ArrayListIterator(s.split(" ")); } }); JavaPairRDD<String, Integer> pairRDD = fileRDD1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); JavaPairRDD<String, Integer> reducebykeyRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); List<Tuple2<String, Integer>> collect = reducebykeyRDD.collect(); for(Tuple2 tup:collect){ System.out.println(tup); } Integer num = accumulator.value(); System.out.println("一共有:"+num+""); sc.close(); } }

結果輸出:

19/04/30 15:11:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 41 ms on localhost (executor driver) (1/2) 19/04/30 15:11:49 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 42 ms on localhost (executor driver) (2/2) 19/04/30 15:11:49 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/04/30 15:11:49 INFO DAGScheduler: ResultStage 1 (collect at totalization_device.java:50) finished in 0.051 s 19/04/30 15:11:49 INFO DAGScheduler: Job 0 finished: collect at totalization_device.java:50, took 0.273877 s (4,1) (authentication,1) (Registered,3) (is,1) (Found,3) (master.Master:,3) (spark.SecurityManager:,5) (util.log:,1) (19,1) (modify,4) (classes,1) (6,1) ([jar:file:/opt/workspace/hive-3.1.0/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class],1) (8080.,1) (type,1) (with,5) (INFO,17) (permissions:,3) (groups,4) (using,1) (19/04/24,18) (Class,1) (@1326ms,1) (WARN,1) (root,2) (signal,3) ('MasterUI',1) (13,1) (24,1) (Set(root);,2) (version,1) (11,1) (ui,1) (8,1) (load,1) (Set();,1) (20,1) (15,1) (14:23:51,14) (Actual,1) (initialized,1) (server.Server:,2) (master,1) (,34) (multiple,1) (56130C,1) (handler,3) (22,1) (26,1) (TERM,1) (17,1) (daemon,1) (bindings.,1) (builtin-java,1) (server.AbstractConnector:,1) (users,2) ([jar:file:/opt/workspace/hbase-1.4.6/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class],1) (http://www.slf4j.org/codes.html#multiple_bindings,1)
(105L,,1) (Starting,1) (jetty-9.3.z-SNAPSHOT,1) (Spark,2) (14,1) (SLF4J,1) (platform...,1) (7,1) (util.NativeCodeLoader:,1) (Successfully,2) (on,2) ('sparkMaster',1) (library,1) (service,2) (16,1) (at,1) (in,3) (9,1) (443265@master1,1) (See,1) (7077.,1) (Logging,1) (missions:,1) (util.Utils:,2) (spark://master1:7077,1)
(for,5) (Changing,4) (25,1) (native-hadoop,1) (port,2) (Running,1) (explanation.,1) (your,1) (view,4) (acls,5) (10,1) (Unable,1) (binding,4) (to:,4) (disabled;,2) (contains,1) (util.SignalUtils:,3) (process,1) (21,1) (SLF4J:,6) (ServerConnector@1cbf22af{HTTP/1.1,[http/1.1]}{0.0.0.0:8080},1) (23,1) (5,1) (18,1) (SecurityManager:,1) (Started,3) (INT,1) (Set(),1) ("spark-root-org.apache.spark.deploy.master.Master-1-master1.out",1) (to,1) (applicable,1) (HUP,1) (started,2) (of,1) (path,1) (where,1) (12,1) (an,1) ([jar:file:/opt/workspace/hadoop-2.9.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class],1) ([org.slf4j.impl.Log4jLoggerFactory],1) (2.3.0,1) (14:23:50,4) (@1280ms,1) (name:,1) (per,1) 一共有:25行 19/04/30 15:11:49 INFO SparkUI: Stopped Spark web UI at http://hadoop:4040
19/04/30 15:11:49 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/04/30 15:11:49 INFO MemoryStore: MemoryStore cleared 19/04/30 15:11:49 INFO BlockManager: BlockManager stopped 19/04/30 15:11:49 INFO BlockManagerMaster: BlockManagerMaster stopped 19/04/30 15:11:49 INFO 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM