Spark中的各種action算子操作(java版)


在我看來,Spark編程中的action算子的作用就像一個觸發器,用來觸發之前的transformation算子。transformation操作具有懶加載的特性,你定義完操作之后並不會立即加載,只有當某個action的算子執行之后,前面所有的transformation算子才會全部執行。常用的action算子如下代碼所列:(java版) 
package cn.spark.study.core;

import java.util.Arrays; 
import java.util.List; 
import java.util.Map;

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

/** 
* action操作實戰 
* @author dd 

*/ 
public class ActionOperation { 
public static void main(String[] args) { 
//reduceTest(); 
//collectTest(); 
//countTest(); 
//takeTest(); 
countByKeyTest(); 
}

 

  1 /**
  2  * reduce算子
  3  * 案例:求累加和
  4  */
  5 private static void reduceTest(){
  6     SparkConf conf = new SparkConf()
  7                     .setAppName("reduce")
  8                     .setMaster("local");
  9     JavaSparkContext sc = new JavaSparkContext(conf);
 10 
 11     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
 12 
 13     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
 14 
 15     //使用reduce操作對集合中的數字進行累加
 16     int sum = numbersRDD.reduce(new Function2<Integer, Integer, Integer>() {
 17 
 18         @Override
 19         public Integer call(Integer arg0, Integer arg1) throws Exception {
 20             return arg0+arg1;
 21         }
 22     });
 23 
 24     System.out.println(sum);
 25 
 26     sc.close();
 27 }
 28 
 29 /**
 30  * collect算子
 31  * 可以將集群上的數據拉取到本地進行遍歷(不推薦使用)
 32  */
 33 private static void collectTest(){
 34     SparkConf conf = new SparkConf()
 35     .setAppName("collect")
 36     .setMaster("local");
 37     JavaSparkContext sc = new JavaSparkContext(conf);
 38 
 39     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
 40 
 41     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
 42 
 43     JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() {
 44 
 45         @Override
 46         public Integer call(Integer arg0) throws Exception {
 47             // TODO Auto-generated method stub
 48             return arg0*2;
 49         }
 50     });
 51 
 52     //foreach的action操作是在遠程集群上遍歷rdd中的元素,而collect操作是將在分布式集群上的rdd
 53     //數據拉取到本地,這種方式一般不建議使用,因為如果rdd中的數據量較大的話,比如超過一萬條,那么性能會
 54     //比較差,因為要從遠程走大量的網絡傳輸,將數據獲取到本地,有時還可能發生oom異常,內存溢出
 55     //所以還是推薦使用foreach操作來對最終的rdd進行處理
 56     List<Integer> doubleNumList = doubleNumbers.collect();
 57     for(Integer num : doubleNumList){
 58         System.out.println(num);
 59     }
 60     sc.close();
 61 }
 62 
 63 /**
 64  * count算子
 65  * 可以統計rdd中的元素個數
 66  */
 67 private static void countTest(){
 68     SparkConf conf = new SparkConf()
 69     .setAppName("count")
 70     .setMaster("local");
 71     JavaSparkContext sc = new JavaSparkContext(conf);
 72 
 73     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
 74 
 75     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
 76 
 77     //對rdd使用count操作統計rdd中元素的個數
 78     long count = numbersRDD.count();
 79     System.out.println(count);
 80 
 81     sc.close();
 82 }
 83 
 84 /**
 85  * take算子
 86  * 將遠程rdd的前n個數據拉取到本地
 87  */
 88 private static void takeTest(){
 89     SparkConf conf = new SparkConf()
 90     .setAppName("take")
 91     .setMaster("local");
 92     JavaSparkContext sc = new JavaSparkContext(conf);
 93 
 94     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
 95 
 96     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
 97 
 98     //take操作與collect操作類似,也是從遠程集群上獲取rdd數據,但是,collect操作獲取的是rdd的
 99     //所有數據,take獲取的只是前n個數據
100     List<Integer> top3number = numbersRDD.take(3);
101     for(Integer num : top3number){
102         System.out.println(num);
103     }
104     sc.close();
105 }
106 
107 /**
108  * saveAsTextFile算子
109  * 
110  */
111 private static void saveAsTExtFileTest(){
112     SparkConf conf = new SparkConf()
113     .setAppName("saveAsTextFile");
114 
115     JavaSparkContext sc = new JavaSparkContext(conf);
116 
117     List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
118 
119     JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);
120 
121     JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() {
122 
123         @Override
124         public Integer call(Integer arg0) throws Exception {
125             // TODO Auto-generated method stub
126             return arg0*2;
127         }
128     });
129 
130     //saveAsTextFile算子可以直接將rdd中的數據保存在hdfs中
131     //但是我們在這里只能指定保存的文件夾也就是目錄,那么實際上,會保存為目錄中的
132     //  /double_number.txt/part-00000文件
133     doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");
134 
135     sc.close();
136 }
137 
138 /**
139  * countByKey算子
140  */
141 
142 private static void countByKeyTest(){
143     SparkConf conf = new SparkConf()
144     .setAppName("take")
145     .setMaster("local");
146     JavaSparkContext sc = new JavaSparkContext(conf);
147 
148     List<Tuple2<String, String>> studentsList = Arrays.asList(
149             new Tuple2<String, String>("class1","leo"),
150             new Tuple2<String, String>("class2","jack"),
151             new Tuple2<String, String>("class1","marry"),
152             new Tuple2<String, String>("class2","tom"),
153             new Tuple2<String, String>("class2","david"));
154 
155     JavaPairRDD<String, String> studentsRDD = sc.parallelizePairs(studentsList);
156 
157     //countByKey算子可以統計每個key對應元素的個數
158     //countByKey返回的類型直接就是Map<String,Object>
159 
160     Map<String, Object> studentsCounts = studentsRDD.countByKey();
161 
162     for(Map.Entry<String, Object> studentsCount : studentsCounts.entrySet()){
163         System.out.println(studentsCount.getKey()+" : "+studentsCount.getValue());
164     }
165     sc.close();
166 }

 

原文引自:http://blog.csdn.net/kongshuchen/article/details/51344124


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM