spark join算子


java

 1 /** 
 2  *join算子是根據兩個rdd的key進行關聯操作,類似scala中的拉鏈操作,返回的新元素為<key,value>,一對一  3  *@author Tele  4  *  5  */
 6 public class JoinDemo {  7     private static SparkConf conf = new SparkConf().setMaster("local").setAppName("joindemo");  8     private static JavaSparkContext jsc = new JavaSparkContext(conf);  9     public static void main(String[] args) { 10         
11         //假設每個學生只有一門成績
12         List<Tuple2<Integer,String>> studentList = Arrays.asList( 13                                                     new Tuple2<Integer,String>(1,"tele"), 14                                                     new Tuple2<Integer,String>(2,"yeye"), 15                                                     new Tuple2<Integer,String>(3,"wyc") 16  ); 17         
18         List<Tuple2<Integer,Integer>> scoreList = Arrays.asList( 19                                                   new Tuple2<Integer,Integer>(1,100), 20                                                   new Tuple2<Integer,Integer>(1,1100), 21                                                   new Tuple2<Integer,Integer>(2,90), 22                                                   new Tuple2<Integer,Integer>(3,70) 23  ); 24                 
25         
26         JavaPairRDD<Integer, String> studentRDD = jsc.parallelizePairs(studentList); 27         JavaPairRDD<Integer, Integer> scoreRDD = jsc.parallelizePairs(scoreList); 28         
29         //注意此處生成的新rdd對的參數類型,第一個泛型參數為key的類型,Tuple2的String與Integer分別對應原rdd的value類型
30         JavaPairRDD<Integer, Tuple2<String, Integer>> result = studentRDD.join(scoreRDD); 31         
32         result.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { 33             private static final long serialVersionUID = 1L; 34 
35  @Override 36             public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception { 37                 System.out.println("學號:" + t._1); 38                 System.out.println("姓名:" + t._2._1); 39                 System.out.println("成績:" + t._2._2); 40                 System.out.println("================="); 41  } 42  }); 43         
44  jsc.close(); 45         
46  } 47 }

scala

 1 object JoinDemo {  2     def main(args: Array[String]): Unit = {  3       val conf = new SparkConf().setMaster("local").setAppName("joindemo");  4       val sc = new SparkContext(conf);  5       
 6       val studentArr = Array((1,"tele"),(2,"yeye"),(3,"wyc"));  7       val scoreArr = Array((1,100),(2,80),(3,100));  8       
 9       val studentRDD = sc.parallelize(studentArr,1); 10       val scoreRDD = sc.parallelize(scoreArr,1); 11       
12       val result = studentRDD.join(scoreRDD); 13       
14       result.foreach(t=>{ 15         println("學號:" + t._1); 16         println("姓名:" + t._2._1); 17         println("成績:" + t._2._2); 18         println("============") 19  }) 20       
21  } 22 }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM