java
1 public class DynamicDemo { 2 private static SparkConf conf = new SparkConf().setAppName("dynamicdemo").setMaster("local"); 3 private static JavaSparkContext jsc = new JavaSparkContext(conf); 4 private static SparkSession session = new SparkSession(jsc.sc()); 5
6 public static void main(String[] args) { 7
8 // 創建rdd
9 JavaRDD<String> rdd = jsc.textFile("./src/main/java/cn/tele/spark_sql/rdd2dataset/students.txt"); 10
11 // 創建Row的rdd
12 JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() { 13
14 private static final long serialVersionUID = 1L; 15
16 @Override 17 public Row call(String v1) throws Exception { 18 String[] fields = v1.split(","); 19 return RowFactory.create(Integer.valueOf(fields[0]), fields[1], Integer.valueOf(fields[2])); 20 } 21 }); 22
23 // 創建schema
24 StructType schema = DataTypes 25 .createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.IntegerType, false), 26 DataTypes.createStructField("name", DataTypes.StringType, false), 27 DataTypes.createStructField("age", DataTypes.IntegerType, false))); 28
29 // 轉換
30 Dataset<Row> dataset = session.createDataFrame(rowRdd, schema); 31
32 dataset.createOrReplaceTempView("students"); 33
34 Dataset<Row> result = session.sql("select * from students where age<=18"); 35 result.show(); 36 session.stop();
37 jsc.close(); 38 } 39 }
scala
1 object DynamicDemo { 2 def main(args: Array[String]): Unit = { 3 val conf = new SparkConf().setAppName("reflectdemo").setMaster("local") 4
5 val sc = new SparkContext(conf) 6
7 val sqlContext = new SQLContext(sc) 8
9 //創建rdd
10 val rdd = sc.textFile("./src/main/scala/cn/tele/spark_sql/rdd2dataframe/students.txt", 8) 11
12 val rowRdd = rdd.map(lines => { 13 val arr = lines.split(","); 14 Row(arr(0).trim().toInt, arr(1), arr(2).trim().toInt) 15 }) 16
17 val schema = DataTypes.createStructType(Array( 18 /* DataTypes.createStructField("id",DataTypes.IntegerType,false), 19 DataTypes.createStructField("name",DataTypes.StringType,false), 20 DataTypes.createStructField("age",DataTypes.IntegerType,false)*/
21 StructField("id", DataTypes.IntegerType, false), 22 StructField("name", DataTypes.StringType, false), 23 StructField("age", DataTypes.IntegerType, false))) 24
25 //轉換
26 val dataframe = sqlContext.createDataFrame(rowRdd, schema) 27
28 dataframe.createOrReplaceTempView("students") 29
30 val result = sqlContext.sql("select * from students where age<=18") 31 result.show() 32 } 33 }