Spark JavaRDD、JavaPairRDD、Dataset相互轉換與打印


主要內容:

1. List轉JavaRDD,打印JavaRDD

2. List轉JavaRDD,JavaRDD轉JavaPairRDD,打印JavaPairRDD

3. JavaRDD<String> 轉 JavaRDD<Row>


 

1. 先將List轉為JavaRDD,再通過collect()和foreach打印JavaRDD

 

/**
 * @author Yu Wanlong
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReadTextToRDD {

  public static void main(String[] args) {
    // configure spark
    SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD")
        .setMaster("local[2]").set("spark.executor.memory","2g");
    // start a spark context
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // build List<String>
    List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1");
    // List<String> to JavaRDD<String>
    JavaRDD<String> javaRDD = jsc.parallelize(list);

    // 使用collect打印JavaRDD
    for (String str : javaRDD.collect()) {
      System.out.println(str);
    }
    // 使用foreach打印JavaRDD
    javaRDD.foreach(new VoidFunction<String>() {
      @Override
      public void call(String s) throws Exception {
        System.out.println(s);
      }
    });
  }
}

a:1
a:2
b:1
b:1
c:1
d:1

 

 

 

2.  List轉JavaRDD,JavaRDD轉JavaPairRDD,打印JavaPairRDD

/**
 * @author Yu Wanlong
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReadTextToRDD {

  public static void main(String[] args) {
    // configure spark
    SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD")
        .setMaster("local[2]").set("spark.executor.memory","2g");
    // start a spark context
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // build List<String>
    List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1");
    // List<String> to JavaRDD<String>
    JavaRDD<String> javaRDD = jsc.parallelize(list);
    // JavaRDD<String> to JavaPairRDD
    JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair(
        new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) throws Exception {
            String[] ss = s.split(":");
            return new Tuple2<String, Integer>(ss[0], Integer.parseInt(ss[1]));
          }
        });
    // 使用collect對JavaPairRDD打印
    for (Tuple2<String, Integer> str : javaPairRDD.collect()) {
      System.out.println(str.toString());
    }
  }
}

(a,1)
(a,2)
(b,1)
(b,1)
(c,1)
(d,1)

 

 在JavaRDD<String>轉為JavaPairRDD<String,Integer>的過程中,關鍵點為:

第一:mapToPair函數中的PairFunction<String, String, Integer>():PairFunction<JavaRDD輸入的類型, 返回的JavaPairRDD的key類型, 返回的JavaPairRDD的value類型>()

第二:由於JavaPairRDD的存儲形式本是key-value形式,Tuple2<String, Integer> 為需要返回的鍵值對類型,Tuple2<Key的類型, value類型>

第三:String s,String類型為JavaRDD<String>中的String,s代表其值

第四:return new Tuple2<String, Integer>(ss[0], Integer.parseInt(ss[1])),此處為返回的key-value的返回結果

 

小結:JavaRDD在轉換成JavaPairRDD的時候,實際上是對單行的數據整合成key-value形式的過程,由JavaPairRDD在進行key-value運算時效率能大大提升

 

3.  JavaRDD<String> 轉 JavaRDD<Row>

 

/**
 * @author Yu Wanlong
 */

import org.apache.spark.sql.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReadTextToRDD {

  public static void main(String[] args) {
    // configure spark
    SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD")
        .setMaster("local[2]").set("spark.executor.memory","2g");
    // start a spark context
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // build List<String>
    List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1");
    // List<String> to JavaRDD<String>
    JavaRDD<String> javaRDD = jsc.parallelize(list);

    // JavaRDD<String> to JavaRDD<Row>
    JavaRDD<Row> javaRDDRow = javaRDD.map(new Function<String, Row>() {
      @Override
      public Row call(String s) throws Exception {
        String[] ss = s.split(":");
        return RowFactory.create(ss[0], ss[1]);
      }
    });
    
    // 打印JavaRDD<Row>
    for (Row str : javaRDDRow.collect()) {
      System.out.println(str.toString());
    }
  }
}

[a,1]
[a,2]
[b,1]
[b,1]
[c,1]
[d,1]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM