1.常用的方式通過sparksession讀取外部文件或者數據生成dataset(這里就不講了)
注: 生成Row對象的方法提一下:RowFactory.create(x,y,z),取Row中的數據使用row.getAs("列名")來獲取對應的列值或者row.getInt(0),row.getString(1)(但這個要注意順序)
2.通過調用createDataFrame生成Dataset
通過反射的方式將非json格式的RDD轉換成DataFrame(不建議使用)
自定義類要可序列化
自定義類的訪問級別是Public
RDD轉成DataFrame后會根據映射將字段按Assci碼排序
將DataFrame轉換成RDD時獲取字段兩種方式,一種是df.getInt(0)下標獲取(不推薦使用),另一種是df.getAs(“列名”)獲取(推薦使用)
關於序列化問題:
1.反序列化時serializable 版本號不一致時會導致不能反序列化。
2.子類中實現了serializable接口,父類中沒有實現,父類中的變量不能被序列化,序列化后父類中的變量會得到null。
注意:父類實現serializable接口,子類沒有實現serializable接口時,子類可以正常序列化
3.被關鍵字transient修飾的變量不能被序列化。
4.靜態變量不能被序列化,屬於類,不屬於方法和對象,所以不能被序列化。
另外:一個文件多次writeObject時,如果有相同的對象已經寫入文件,那么下次再寫入時,只保存第二次寫入的引用,讀取時,都是第一次保存的對象。
1 /**方法1 2 * 注意: 3 * 1.自定義類必須是可序列化的 4 * 2.自定義類訪問級別必須是Public 5 * 3.RDD轉成DataFrame會把自定義類中字段的名稱按assci碼排序 6 */ 7 SparkConf conf = new SparkConf(); 8 conf.setMaster("local").setAppName("RDD"); 9 JavaSparkContext sc = new JavaSparkContext(conf); 10 SQLContext sqlContext = new SQLContext(sc); 11 JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt"); 12 JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() { 13 14 /** 15 * 16 */ 17 private static final long serialVersionUID = 1L; 18 19 @Override 20 public Person call(String s) throws Exception { 21 Person p = new Person(); 22 p.setId(s.split(",")[0]); 23 p.setName(s.split(",")[1]); 24 return p; 25 } 26 }); 27 /** 28 * 傳入進去Person.class的時候,sqlContext是通過反射的方式創建DataFrame 29 * 在底層通過反射的方式獲得Person的所有field,結合RDD本身,就生成了DataFrame 30 */ 31 DataFrame df = sqlContext.createDataFrame(personRDD, Person.class); 32 33 class Person implements Serializable { 34 private static final long serialVersionUID = -6907013906164009798L; 35 private String Id; 36 private String name; 37 38 39 40 public void setId(String appId) { 41 this.appId = appId; 42 } 43 44 public String getId() { 45 return appId; 46 } 47 48 public String getname() { 49 return detail; 50 } 51 52 public void setname(String detail) { 53 this.detail = detail; 54 } 55 }
1 //方法2: 2 JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt"); 3 /** 4 * 轉換成Row類型的RDD 5 */ 6 JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() { 7 8 /** 9 * 10 */ 11 private static final long serialVersionUID = 1L; 12 13 @Override 14 public Row call(String s) throws Exception { 15 return RowFactory.create(//這里字段順序一定要和下邊 StructField對應起來 16 String.valueOf(s.split(",")[0]), 17 String.valueOf(s.split(",")[1]), 18 ); 19 } 20 }); 21 /** 22 * 動態構建DataFrame中的元數據,一般來說這里的字段可以來源自字符串,也可以來源於外部數據庫 23 */ 24 List<StructField> asList =Arrays.asList(//這里字段順序一定要和上邊對應起來 25 DataTypes.createStructField("id", DataTypes.StringType, true), 26 DataTypes.createStructField("name", DataTypes.StringType, true) 27 ); 28 StructType schema = DataTypes.createStructType(asList); 29 /* 30 StructType schema = new StructType(new StructField[]{ 31 new StructField("id", DataTypes.StringType, false, Metadata.empty()), 32 new StructField("name", DataTypes.StringType, false, Metadata.empty()), 33 }); 34 */ 35 //DataFrame df = sqlContext.createDataFrame(List<Row> ,schema)這個方法也可以 36 DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
1 //方法3 2 public static class Person implements Serializable { 3 private String name; 4 private int age; 5 6 public String getName() { 7 return name; 8 } 9 10 public void setName(String name) { 11 this.name = name; 12 } 13 14 public int getAge() { 15 return age; 16 } 17 18 public void setAge(int age) { 19 this.age = age; 20 } 21 } 22 23 // Create an instance of a Bean class 24 Person person = new Person(); 25 person.setName("Andy"); 26 person.setAge(32); 27 28 // Encoders are created for Java beans 29 Encoder<Person> personEncoder = Encoders.bean(Person.class); 30 Dataset<Person> javaBeanDS = spark.createDataset( 31 Collections.singletonList(person), 32 personEncoder 33 ); 34 javaBeanDS.show(); 35 // +---+----+ 36 // |age|name| 37 // +---+----+ 38 // | 32|Andy| 39 // +---+----+ 40 41 // Encoders for most common types are provided in class Encoders 42 Encoder<Integer> integerEncoder = Encoders.INT(); 43 Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); 44 Dataset<Integer> transformedDS = primitiveDS.map( 45 (MapFunction<Integer, Integer>) value -> value + 1, 46 integerEncoder); 47 transformedDS.collect(); // Returns [2, 3, 4] 48 49 // DataFrames can be converted to a Dataset by providing a class. Mapping based on name 50 String path = "examples/src/main/resources/people.json"; 51 Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder); 52 peopleDS.show(); 53 // +----+-------+ 54 // | age| name| 55 // +----+-------+ 56 // |null|Michael| 57 // | 30| Andy| 58 // | 19| Justin| 59 // +----+-------+