生成dataset的幾種方式


1.常用的方式通過sparksession讀取外部文件或者數據生成dataset(這里就不講了)
  注: 生成Row對象的方法提一下:RowFactory.create(x,y,z),取Row中的數據使用row.getAs("列名")來獲取對應的列值或者row.getInt(0),row.getString(1)(但這個要注意順序)

   

2.通過調用createDataFrame生成Dataset
通過反射的方式將非json格式的RDD轉換成DataFrame(不建議使用)

自定義類要可序列化
自定義類的訪問級別是Public
RDD轉成DataFrame后會根據映射將字段按Assci碼排序
將DataFrame轉換成RDD時獲取字段兩種方式,一種是df.getInt(0)下標獲取(不推薦使用),另一種是df.getAs(“列名”)獲取(推薦使用)
關於序列化問題:
             1.反序列化時serializable 版本號不一致時會導致不能反序列化。
             2.子類中實現了serializable接口,父類中沒有實現,父類中的變量不能被序列化,序列化后父類中的變量會得到null。
             注意:父類實現serializable接口,子類沒有實現serializable接口時,子類可以正常序列化
            3.被關鍵字transient修飾的變量不能被序列化。
            4.靜態變量不能被序列化,屬於類,不屬於方法和對象,所以不能被序列化。
           另外:一個文件多次writeObject時,如果有相同的對象已經寫入文件,那么下次再寫入時,只保存第二次寫入的引用,讀取時,都是第一次保存的對象。

 1 /**方法1
 2 * 注意:
 3 * 1.自定義類必須是可序列化的
 4 * 2.自定義類訪問級別必須是Public
 5 * 3.RDD轉成DataFrame會把自定義類中字段的名稱按assci碼排序
 6 */
 7 SparkConf conf = new SparkConf();
 8 conf.setMaster("local").setAppName("RDD");
 9 JavaSparkContext sc = new JavaSparkContext(conf);
10 SQLContext sqlContext = new SQLContext(sc);
11 JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt");
12 JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
13 
14     /**
15     * 
16     */
17     private static final long serialVersionUID = 1L;
18 
19     @Override
20     public Person call(String s) throws Exception {
21           Person p = new Person();
22           p.setId(s.split(",")[0]);
23           p.setName(s.split(",")[1]);
24           return p;
25     }
26 });
27 /**
28 * 傳入進去Person.class的時候,sqlContext是通過反射的方式創建DataFrame
29 * 在底層通過反射的方式獲得Person的所有field,結合RDD本身,就生成了DataFrame
30 */
31 DataFrame df = sqlContext.createDataFrame(personRDD, Person.class); 
32 
33 class Person implements Serializable {
34     private static final long serialVersionUID = -6907013906164009798L;
35     private String Id;
36     private String name;
37 
38 
39 
40     public void setId(String appId) {
41         this.appId = appId;
42     }
43 
44     public String getId() {
45         return appId;
46     }
47 
48     public String getname() {
49         return detail;
50     }
51 
52     public void setname(String detail) {
53         this.detail = detail;
54     }
55 }
 1 //方法2:
 2 JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
 3 /**
 4  * 轉換成Row類型的RDD
 5  */
 6 JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
 7 
 8     /**
 9      * 
10      */
11     private static final long serialVersionUID = 1L;
12 
13     @Override
14     public Row call(String s) throws Exception {
15           return RowFactory.create(//這里字段順序一定要和下邊 StructField對應起來
16                 String.valueOf(s.split(",")[0]),
17                 String.valueOf(s.split(",")[1]),
18     );
19     }
20 });
21 /**
22  * 動態構建DataFrame中的元數據,一般來說這里的字段可以來源自字符串,也可以來源於外部數據庫
23  */
24 List<StructField> asList =Arrays.asList(//這里字段順序一定要和上邊對應起來
25     DataTypes.createStructField("id", DataTypes.StringType, true),
26     DataTypes.createStructField("name", DataTypes.StringType, true)
27 );
28 StructType schema = DataTypes.createStructType(asList);
29 /*
30   StructType schema = new StructType(new StructField[]{
31                         new StructField("id", DataTypes.StringType, false, Metadata.empty()),
32                         new StructField("name", DataTypes.StringType, false, Metadata.empty()),
33             });
34 */
35 //DataFrame df = sqlContext.createDataFrame(List<Row> ,schema)這個方法也可以
36 DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
 1 //方法3
 2 public static class Person implements Serializable {
 3   private String name;
 4   private int age;
 5 
 6   public String getName() {
 7     return name;
 8   }
 9 
10   public void setName(String name) {
11     this.name = name;
12   }
13 
14   public int getAge() {
15     return age;
16   }
17 
18   public void setAge(int age) {
19     this.age = age;
20   }
21 }
22 
23 // Create an instance of a Bean class
24 Person person = new Person();
25 person.setName("Andy");
26 person.setAge(32);
27 
28 // Encoders are created for Java beans
29 Encoder<Person> personEncoder = Encoders.bean(Person.class);
30 Dataset<Person> javaBeanDS = spark.createDataset(
31   Collections.singletonList(person),
32   personEncoder
33 );
34 javaBeanDS.show();
35 // +---+----+
36 // |age|name|
37 // +---+----+
38 // | 32|Andy|
39 // +---+----+
40 
41 // Encoders for most common types are provided in class Encoders
42 Encoder<Integer> integerEncoder = Encoders.INT();
43 Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
44 Dataset<Integer> transformedDS = primitiveDS.map(
45     (MapFunction<Integer, Integer>) value -> value + 1,
46     integerEncoder);
47 transformedDS.collect(); // Returns [2, 3, 4]
48 
49 // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
50 String path = "examples/src/main/resources/people.json";
51 Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
52 peopleDS.show();
53 // +----+-------+
54 // | age|   name|
55 // +----+-------+
56 // |null|Michael|
57 // |  30|   Andy|
58 // |  19| Justin|
59 // +----+-------+

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM