在我看來,Spark編程中的action算子的作用就像一個觸發器,用來觸發之前的transformation算子。transformation操作具有懶加載的特性,你定義完操作之后並不會立即加載,只有當某個action的算子執行之后,前面所有的transformation算子才會全部執行。常用的action算子如下代碼所列:(java版)
package cn.spark.study.core;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
/**
* action操作實戰
* @author dd
*
*/
public class ActionOperation {
public static void main(String[] args) {
//reduceTest();
//collectTest();
//countTest();
//takeTest();
countByKeyTest();
}
1 /** 2 * reduce算子 3 * 案例:求累加和 4 */ 5 private static void reduceTest(){ 6 SparkConf conf = new SparkConf() 7 .setAppName("reduce") 8 .setMaster("local"); 9 JavaSparkContext sc = new JavaSparkContext(conf); 10 11 List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10); 12 13 JavaRDD<Integer> numbersRDD = sc.parallelize(numberList); 14 15 //使用reduce操作對集合中的數字進行累加 16 int sum = numbersRDD.reduce(new Function2<Integer, Integer, Integer>() { 17 18 @Override 19 public Integer call(Integer arg0, Integer arg1) throws Exception { 20 return arg0+arg1; 21 } 22 }); 23 24 System.out.println(sum); 25 26 sc.close(); 27 } 28 29 /** 30 * collect算子 31 * 可以將集群上的數據拉取到本地進行遍歷(不推薦使用) 32 */ 33 private static void collectTest(){ 34 SparkConf conf = new SparkConf() 35 .setAppName("collect") 36 .setMaster("local"); 37 JavaSparkContext sc = new JavaSparkContext(conf); 38 39 List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10); 40 41 JavaRDD<Integer> numbersRDD = sc.parallelize(numberList); 42 43 JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() { 44 45 @Override 46 public Integer call(Integer arg0) throws Exception { 47 // TODO Auto-generated method stub 48 return arg0*2; 49 } 50 }); 51 52 //foreach的action操作是在遠程集群上遍歷rdd中的元素,而collect操作是將在分布式集群上的rdd 53 //數據拉取到本地,這種方式一般不建議使用,因為如果rdd中的數據量較大的話,比如超過一萬條,那么性能會 54 //比較差,因為要從遠程走大量的網絡傳輸,將數據獲取到本地,有時還可能發生oom異常,內存溢出 55 //所以還是推薦使用foreach操作來對最終的rdd進行處理 56 List<Integer> doubleNumList = doubleNumbers.collect(); 57 for(Integer num : doubleNumList){ 58 System.out.println(num); 59 } 60 sc.close(); 61 } 62 63 /** 64 * count算子 65 * 可以統計rdd中的元素個數 66 */ 67 private static void countTest(){ 68 SparkConf conf = new SparkConf() 69 .setAppName("count") 70 .setMaster("local"); 71 JavaSparkContext sc = new JavaSparkContext(conf); 72 73 List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10); 74 75 JavaRDD<Integer> numbersRDD = sc.parallelize(numberList); 76 77 //對rdd使用count操作統計rdd中元素的個數 78 long count = numbersRDD.count(); 79 System.out.println(count); 80 81 sc.close(); 82 } 83 84 /** 85 * take算子 86 * 將遠程rdd的前n個數據拉取到本地 87 */ 88 private static void takeTest(){ 89 SparkConf conf = new SparkConf() 90 .setAppName("take") 91 .setMaster("local"); 92 JavaSparkContext sc = new JavaSparkContext(conf); 93 94 List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10); 95 96 JavaRDD<Integer> numbersRDD = sc.parallelize(numberList); 97 98 //take操作與collect操作類似,也是從遠程集群上獲取rdd數據,但是,collect操作獲取的是rdd的 99 //所有數據,take獲取的只是前n個數據 100 List<Integer> top3number = numbersRDD.take(3); 101 for(Integer num : top3number){ 102 System.out.println(num); 103 } 104 sc.close(); 105 } 106 107 /** 108 * saveAsTextFile算子 109 * 110 */ 111 private static void saveAsTExtFileTest(){ 112 SparkConf conf = new SparkConf() 113 .setAppName("saveAsTextFile"); 114 115 JavaSparkContext sc = new JavaSparkContext(conf); 116 117 List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10); 118 119 JavaRDD<Integer> numbersRDD = sc.parallelize(numberList); 120 121 JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() { 122 123 @Override 124 public Integer call(Integer arg0) throws Exception { 125 // TODO Auto-generated method stub 126 return arg0*2; 127 } 128 }); 129 130 //saveAsTextFile算子可以直接將rdd中的數據保存在hdfs中 131 //但是我們在這里只能指定保存的文件夾也就是目錄,那么實際上,會保存為目錄中的 132 // /double_number.txt/part-00000文件 133 doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt"); 134 135 sc.close(); 136 } 137 138 /** 139 * countByKey算子 140 */ 141 142 private static void countByKeyTest(){ 143 SparkConf conf = new SparkConf() 144 .setAppName("take") 145 .setMaster("local"); 146 JavaSparkContext sc = new JavaSparkContext(conf); 147 148 List<Tuple2<String, String>> studentsList = Arrays.asList( 149 new Tuple2<String, String>("class1","leo"), 150 new Tuple2<String, String>("class2","jack"), 151 new Tuple2<String, String>("class1","marry"), 152 new Tuple2<String, String>("class2","tom"), 153 new Tuple2<String, String>("class2","david")); 154 155 JavaPairRDD<String, String> studentsRDD = sc.parallelizePairs(studentsList); 156 157 //countByKey算子可以統計每個key對應元素的個數 158 //countByKey返回的類型直接就是Map<String,Object> 159 160 Map<String, Object> studentsCounts = studentsRDD.countByKey(); 161 162 for(Map.Entry<String, Object> studentsCount : studentsCounts.entrySet()){ 163 System.out.println(studentsCount.getKey()+" : "+studentsCount.getValue()); 164 } 165 sc.close(); 166 }
原文引自:http://blog.csdn.net/kongshuchen/article/details/51344124