1.map算子
private static void map() { //创建SparkConf SparkConf conf = new SparkConf() .setAppName("map") .setMaster("local"); //创建JavasparkContext JavaSparkContext sc = new JavaSparkContext(conf); //构造集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5); //并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //使用map算子,将集合中的每个元素都乘以2 JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); //打印新的RDD multipleNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); //关闭JavasparkContext sc.close(); }
2.filter算子
private static void filter() { //创建SparkConf SparkConf conf = new SparkConf() .setAppName("filter") .setMaster("local"); //创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); //模拟集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); //并行化集合,创建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //对集合使用filter算子,过滤出集合中的偶数 JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { return v1%2==0; } }); evenNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); sc.close(); }
3.flatMap算子
Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;
而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:
操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
操作2:最后将所有对象合并为一个对象
private static void flatMap() { SparkConf conf = new SparkConf() .setAppName("flatMap") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> lineList = Arrays.asList("hello you","hello me","hello world"); JavaRDD<String> lines = sc.parallelize(lineList); //对RDD执行flatMap算子,将每一行文本,拆分为多个单词 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //在这里,传入第一行,hello,you //返回的是一个Iterable<String>(hello,you) @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); words.foreach(new VoidFunction<String>() { @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); }
4.groupByKey算子
private static void groupByKey() { SparkConf conf = new SparkConf() .setAppName("groupByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 90), new Tuple2<String, Integer>("class1", 97), new Tuple2<String, Integer>("class2", 89)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //针对scoresRDD,执行groupByKey算子,对每个班级的成绩进行分组 //相当于是,一个key join上的所有value,都放到一个Iterable里面去了 JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey(); groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class:" + t._1); Iterator<Integer> ite = t._2.iterator(); while(ite.hasNext()) { System.out.println(ite.next()); } } }); }
5.reduceByKey算子
private static void reduceByKey() { SparkConf conf = new SparkConf() .setAppName("reduceByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 90), new Tuple2<String, Integer>("class1", 97), new Tuple2<String, Integer>("class2", 89)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //reduceByKey算法返回的RDD,还是JavaPairRDD<key,value> JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); }
6.sortByKey算子
private static void sortByKey() { SparkConf conf = new SparkConf() .setAppName("sortByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> scoreList = Arrays.asList( new Tuple2<Integer, String>(78, "marry"), new Tuple2<Integer, String>(89, "tom"), new Tuple2<Integer, String>(72, "jack"), new Tuple2<Integer, String>(86, "leo")); JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList); JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(); sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() { @Override public void call(Tuple2<Integer, String> t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); }
7.join算子
join算子用于关联两个RDD,join以后,会根据key进行join,并返回JavaPairRDD。JavaPairRDD的第一个泛型类型是之前两个JavaPairRDD的key类型,因为通过key进行join的。第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型
private static void join() { SparkConf conf = new SparkConf() .setAppName("join") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "tom"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "marry"), new Tuple2<Integer, String>(4, "leo")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 78), new Tuple2<Integer, Integer>(2, 87), new Tuple2<Integer, Integer>(3, 89), new Tuple2<Integer, Integer>(4, 98)); //并行化两个RDD JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);; JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList); //使用join算子关联两个RDD //join以后,会根据key进行join,并返回JavaPairRDD //JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的 //第二个泛型类型,是Tuple2<v1, v2>的类型,Tuple2的两个泛型分别为原始RDD的value的类型 JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores); //打印 studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:" + t._2._1); System.out.println("student score:" + t._2._2); System.out.println("=========================="); } }); sc.close(); }
更深的方法参见:
http://blog.csdn.net/liulingyuan6/article/details/53397780
http://blog.csdn.net/liulingyuan6/article/details/53410832
https://www.2cto.com/net/201608/543044.html