依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency>
RDD轉化成DataFrame:通過StructType指定schema
package com.zy.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * RDD轉化成DataFrame:通過StructType指定schema */ object StructTypeSchema { def main(args: Array[String]): Unit = { //創建sparkSession對象 val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate() //獲取sparkContext val sc: SparkContext = sparkSession.sparkContext //設置日志級別 sc.setLogLevel("WARN") //讀取文件 val textFile: RDD[String] = sc.textFile("D:\\person.txt") //切分文件 val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(",")) //關聯對象 val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt)) //創建rdd的schema信息 val schema: StructType = (new StructType) .add("id", IntegerType, true, "id") .add("name", StringType, false, "姓名") .add("age", IntegerType, true, "年齡") //根據rdd和schema信息創建DataFrame val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) //DSL操作 personDF.show() //sql 操作 //將df注冊成表 personDF.createTempView("person") sparkSession.sql("select * from person where id =3").show() sparkSession.stop() } }
RDD轉化成DataFrame:利用反射機制推斷schema
package com.zy.sparksql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * RDD轉化成DataFrame:利用反射機制推斷schema */ //todo 定義一個樣例類 case class Person(id: Int, name: String, age: Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //構建sparkSession 指定appName和master地址(本地測試local) val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //獲取sparkContext val sc: SparkContext = sparkSession.sparkContext //設置日志輸出級別 sc.setLogLevel("WARN") //加載數據 val dataRDD: RDD[String] = sc.textFile("D:\\person.txt") //切分數據 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(",")) //將rdd和person樣例類關聯 val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //將rdd轉換成dataFrame 導入隱式轉換 import sparkSession.implicits._ val personDF: DataFrame = personRDD.toDF //DSL語法 personDF.show() personDF.printSchema() personDF.select("name").show() personDF.filter($"age" > 30).show() println("---------------------------------------------") //sql語法 //首先要創建臨時視圖 personDF.createTempView("person") sparkSession.sql("select * from person where id>1").show() sparkSession.stop() } }