(轉)Spark JAVA RDD API


對API的解釋:

1.1 transform

l  map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD,這個返回的數據集是分布式的數據集

l  filter(func) : 對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD

l  flatMap(func):和map差不多,但是flatMap生成的是多個結果

l  mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition

l  mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index

l  sample(withReplacement,faction,seed):抽樣

l  union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合

l  distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element

l  groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoopreduce函數接受的key-valuelist

l  reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數

l  sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型

1.2 action

l  reduce(func):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的

l  collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組

l  count():返回的是dataset中的element的個數

l  first():返回的是dataset中的第一個元素

l  take(n):返回前n個elements

l  takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed

l  saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支持的文件系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中

l  saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統

l  countByKey():返回的是key對應的個數的一個map,作用於一個RDD

l  foreach(func):對dataset中的每個元素都使用func

 

以下是案例:



/*數據情況
 a 1
b 2
c 3
d 4

e 5*/

 

主函數:

public class SparkCoreTest 
{

    public static void main( String[] args )
    {
    if(args.length<1){
System.out.println("請輸入參數!");
}

 String filepath=args[0];
        JavaSparkContext sc =JavaSparkContextFactory.getJavaSparkContext("sparkCoreTest");

        JavaRDD<String> rdd=sc.textFile(filepath);  

--transform
        //testSparkCoreApiMap(logData);
        //testSparkCoreApiFilter(rdd);
        //testSparkCoreApiFlatMap(rdd);
        
        //testSparkCoreApiUnion(rdd);
       // testSparkCoreApiDistinct(rdd);
       // testSparkCoreApiMaptoPair(rdd);
       //testSparkCoreApiGroupByKey(rdd);
        //testSparkCoreApiReduceByKey(rdd);

--action
        testSparkCoreApiReduce(rdd);
         

        
    }
    
    
    
    /**
     * Map主要是對數據進行處理,不進行數據集的增減
     * 
     * 本案例實現,打印所有數據
     * 
     * @param rdd
     */
    
    private static void testSparkCoreApiMap(JavaRDD<String> rdd){
    JavaRDD<String> logData1=rdd.map(new Function<String,String>(){
          public String call(String s){ 
          return s;
          }
         });
         List list = logData1.collect();
          for (int i = 0; i < list.size(); i++) {
          System.out.println(list.get(i));


  }
    
    }
   
    
    
   /*
    * 
    * 
    * filter主要是過濾數據的功能
    * 本案例實現:過濾含有a的那行數據
    * 
    * 
    */


    private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
    JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
          public Boolean call(String s){ 
         
        return (s.split(" "))[0].equals("a");
          }
         
         });
         List list = logData1.collect();
          for (int i = 0; i < list.size(); i++) {
          System.out.println(list.get(i));


  }
    
    }
    
    
    /*
     * 
     * 
     * flatMap  用戶行轉列
     * 本案例實現:打印所有的字符
     * 
     * 
     */


     private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
    JavaRDD<String> words=rdd.flatMap(
           new FlatMapFunction<String, String>() {
               public Iterable<String> call(String s) throws Exception {
                   return Arrays.asList(s.split(" "));
               }
           }
    );
          List list = words.collect();
           for (int i = 0; i < list.size(); i++) {
          System.out.println(list.get(i));


  }
     
     }
    
     
    
     /**
      * testSparkCoreApiUnion 
      * 合並兩個RDD
      * @param rdd
      */
     private static void testSparkCoreApiUnion(JavaRDD<String> rdd){
    JavaRDD<String> unionRdd=rdd.union(rdd);
    unionRdd.foreach(new VoidFunction<String>(){
    public void call(String lines){
    System.out.println(lines);
    }
    });
     }
     
     
     /**
      * testSparkCoreApiDistinct Test
      * 對RDD去重
      * @param rdd
      */
     private static void testSparkCoreApiDistinct(JavaRDD<String> rdd){
    JavaRDD<String> unionRdd=rdd.union(rdd).distinct();
    unionRdd.foreach(new VoidFunction<String>(){
    public void call(String lines){
    System.out.println(lines);
    }
    });
     }
     
     
     /**
      * testSparkCoreApiMaptoPair Test
      * 把RDD映射為鍵值對類型的數據
      * @param rdd
      */
     private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){
     
    JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){


@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], st[1]);
}
     
    });
     
    pairRdd.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());

}
});
     
     }
     
     
     
     /**
      * testSparkCoreApiGroupByKey Test
      * 對鍵值對類型的數據進行按鍵值合並
      * @param rdd
      */
     
     private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd){
     
    JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){


@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
     
    });
     
    JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd).groupByKey();
    pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
  @Override
  public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
  Iterable<Integer> iter = t._2();  
                for (Integer integer : iter) {  
                System.out.println(integer);
                }  
   
 
  }
  });
    }
     
  
     /**
      * testSparkCoreApiReduceByKey 
      * 對鍵值對進行按鍵相同的對值進行操作
      * @param rdd
      */
     private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){
     
    JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){


  @Override
  public Tuple2<String, Integer> call(String t) throws Exception {
  String[] st=t.split(" ");
  return new Tuple2(st[0], Integer.valueOf(st[1]));
  }
       
      });
     
    JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
           new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
 
return v1+v2;
}
           } 
       ).sortByKey() ;
    pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
  @Override
  public void call(Tuple2<String, Integer> t) throws Exception { 
                  System.out.println(t._2());
  
  }
  });
     }
     
     
     /**
      * testSparkCoreApiReduce
      * 對RDD進行遞歸調用
      * @param rdd
      */
     private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
    //由於原數據是String,需要轉為Integer才能進行reduce遞歸
    JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){


@Override
public Integer call(String v1) throws Exception {
// TODO Auto-generated method stub
return Integer.valueOf(v1.split(" ")[1]);
}  
    });
     
    Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception {
return v1+v2;
}  
    });
    System.out.println(a);
     
     }
     
     
    
}
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM