spark學習(六)Java版RDD基本的基本操作


1.map算子

private static void map() { //創建SparkConf SparkConf conf = new SparkConf() .setAppName("map") .setMaster("local"); //創建JavasparkContext JavaSparkContext sc = new JavaSparkContext(conf); //構造集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5); //並行化集合,創建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //使用map算子,將集合中的每個元素都乘以2 JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); //打印新的RDD multipleNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); //關閉JavasparkContext sc.close(); }

2.filter算子

private static void filter() { //創建SparkConf SparkConf conf = new SparkConf() .setAppName("filter") .setMaster("local"); //創建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); //模擬集合 List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); //並行化集合,創建初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //對集合使用filter算子,過濾出集合中的偶數 JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { return v1%2==0; } }); evenNumberRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer t) throws Exception { System.out.println(t); } }); sc.close(); }

3.flatMap算子

Spark 中 map函數會對每一條輸入進行指定的操作,然后為每一條輸入返回一個對象;

而flatMap函數則是兩個操作的集合——正是“先映射后扁平化”:

操作1:同map函數一樣:對每一條輸入進行指定的操作,然后為每一條輸入返回一個對象

操作2:最后將所有對象合並為一個對象

private static void flatMap() { SparkConf conf = new SparkConf() .setAppName("flatMap") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> lineList = Arrays.asList("hello you","hello me","hello world"); JavaRDD<String> lines = sc.parallelize(lineList); //對RDD執行flatMap算子,將每一行文本,拆分為多個單詞 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //在這里,傳入第一行,hello,you //返回的是一個Iterable<String>(hello,you) @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); words.foreach(new VoidFunction<String>() { @Override public void call(String t) throws Exception { System.out.println(t); } }); sc.close(); }

4.groupByKey算子

private static void groupByKey() { SparkConf conf = new SparkConf() .setAppName("groupByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 90), new Tuple2<String, Integer>("class1", 97), new Tuple2<String, Integer>("class2", 89)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //針對scoresRDD,執行groupByKey算子,對每個班級的成績進行分組 //相當於是,一個key join上的所有value,都放到一個Iterable里面去了 JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey(); groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class:" + t._1); Iterator<Integer> ite = t._2.iterator(); while(ite.hasNext()) { System.out.println(ite.next()); } } }); }

5.reduceByKey算子

private static void reduceByKey() { SparkConf conf = new SparkConf() .setAppName("reduceByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1", 80), new Tuple2<String, Integer>("class2", 90), new Tuple2<String, Integer>("class1", 97), new Tuple2<String, Integer>("class2", 89)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); //reduceByKey算法返回的RDD,還是JavaPairRDD<key,value> JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); }

6.sortByKey算子

private static void sortByKey() { SparkConf conf = new SparkConf() .setAppName("sortByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> scoreList = Arrays.asList( new Tuple2<Integer, String>(78, "marry"), new Tuple2<Integer, String>(89, "tom"), new Tuple2<Integer, String>(72, "jack"), new Tuple2<Integer, String>(86, "leo")); JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList); JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(); sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() { @Override public void call(Tuple2<Integer, String> t) throws Exception { System.out.println(t._1 + ":" + t._2); } }); sc.close(); }

7.join算子
join算子用於關聯兩個RDD,join以后,會根據key進行join,並返回JavaPairRDD。JavaPairRDD的第一個泛型類型是之前兩個JavaPairRDD的key類型,因為通過key進行join的。第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分別為原始RDD的value的類型

private static void join() { SparkConf conf = new SparkConf() .setAppName("join") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> studentList = Arrays.asList( new Tuple2<Integer, String>(1, "tom"), new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "marry"), new Tuple2<Integer, String>(4, "leo")); List<Tuple2<Integer, Integer>> scoreList = Arrays.asList( new Tuple2<Integer, Integer>(1, 78), new Tuple2<Integer, Integer>(2, 87), new Tuple2<Integer, Integer>(3, 89), new Tuple2<Integer, Integer>(4, 98)); //並行化兩個RDD JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);; JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList); //使用join算子關聯兩個RDD //join以后,會根據key進行join,並返回JavaPairRDD //JavaPairRDD的第一個泛型類型,之前兩個JavaPairRDD的key類型,因為通過key進行join的 //第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分別為原始RDD的value的類型 JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores); //打印 studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:" + t._2._1); System.out.println("student score:" + t._2._2); System.out.println("=========================="); } }); sc.close(); }

更深的方法參見:
http://blog.csdn.net/liulingyuan6/article/details/53397780
http://blog.csdn.net/liulingyuan6/article/details/53410832
https://www.2cto.com/net/201608/543044.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM