sparksql 動態設置schema將rdd轉換成dataset/dataframe


java

 1 public class DynamicDemo {  2     private static SparkConf conf = new SparkConf().setAppName("dynamicdemo").setMaster("local");  3     private static JavaSparkContext jsc = new JavaSparkContext(conf);  4     private static SparkSession session = new SparkSession(jsc.sc());  5 
 6     public static void main(String[] args) {  7 
 8         // 創建rdd
 9         JavaRDD<String> rdd = jsc.textFile("./src/main/java/cn/tele/spark_sql/rdd2dataset/students.txt"); 10 
11         // 創建Row的rdd
12         JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() { 13 
14             private static final long serialVersionUID = 1L; 15 
16  @Override 17             public Row call(String v1) throws Exception { 18                 String[] fields = v1.split(","); 19                 return RowFactory.create(Integer.valueOf(fields[0]), fields[1], Integer.valueOf(fields[2])); 20  } 21  }); 22 
23         // 創建schema
24         StructType schema = DataTypes 25                 .createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.IntegerType, false), 26                         DataTypes.createStructField("name", DataTypes.StringType, false), 27                         DataTypes.createStructField("age", DataTypes.IntegerType, false))); 28 
29         // 轉換
30         Dataset<Row> dataset = session.createDataFrame(rowRdd, schema); 31 
32         dataset.createOrReplaceTempView("students"); 33 
34         Dataset<Row> result = session.sql("select * from students where age<=18"); 35  result.show(); 36      session.stop();
37  jsc.close(); 38  } 39 }

scala

 1 object DynamicDemo {  2   def main(args: Array[String]): Unit = {  3     val conf = new SparkConf().setAppName("reflectdemo").setMaster("local")  4 
 5     val sc = new SparkContext(conf)  6 
 7     val sqlContext = new SQLContext(sc)  8 
 9     //創建rdd
10     val rdd = sc.textFile("./src/main/scala/cn/tele/spark_sql/rdd2dataframe/students.txt", 8) 11 
12     val rowRdd = rdd.map(lines => { 13       val arr = lines.split(","); 14       Row(arr(0).trim().toInt, arr(1), arr(2).trim().toInt) 15  }) 16 
17     val schema = DataTypes.createStructType(Array( 18       /* DataTypes.createStructField("id",DataTypes.IntegerType,false), 19  DataTypes.createStructField("name",DataTypes.StringType,false), 20  DataTypes.createStructField("age",DataTypes.IntegerType,false)*/
21       StructField("id", DataTypes.IntegerType, false), 22       StructField("name", DataTypes.StringType, false), 23       StructField("age", DataTypes.IntegerType, false))) 24 
25     //轉換
26     val dataframe = sqlContext.createDataFrame(rowRdd, schema) 27 
28     dataframe.createOrReplaceTempView("students") 29 
30     val result = sqlContext.sql("select * from students where age<=18") 31  result.show() 32  } 33 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM