首先導入maven依賴
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
dataframe
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo1 { //創建case類 case class User(name:String,age:Int) def main(args: Array[String]): Unit = {
//創建sparkconf和sparksession
val conf = new SparkConf().setAppName("sparlsql").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
//隱式轉換 import spark.implicits._ val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23))) val df: DataFrame = raw.toDF("name", "age") df.show() //創建表格 df.createOrReplaceTempView("user") //執行sql語句 val selectUser: DataFrame = spark.sql("select * from user where age > 21") selectUser.show() //轉換成dataset val ds: Dataset[User] = df.as[User] //轉換回rdd val rdd: RDD[Row] = df.rdd //遍歷該rdd for(row <- rdd){ println(row.getString(0)) println(row.getInt(1)) } spark.stop() } }
注意:
1)sparksession的創建不能用new SparkSession的方式,而是利用伴生對象SparkSession來創建builder,通過builder來創建sparksession。
2)隱式轉換import spark.implicits._不是引入了一個包,spark指的是程序上下文環境中的sparksession對象,所以這里引入了該對象的implicitis方法,_指代該方法的參數。如果該對象改名為sparksession,相應的隱式轉換語句變為import sparksession.implicitis._
3)row對象的get方法的下標索引是從0開始,而不像jdbc的resultset下標索引從1開始。
DataSet
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object Demo2 { //創建case類 case class People(name:String, age:Int) def main(args: Array[String]): Unit = { //創建sparkconf和sparksession
val conf = new SparkConf().setAppName("sparlsql").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
//隱式轉換 import spark.implicits._ //創建dataset val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS() //轉換成dataframe val peopleDataframe: DataFrame = peopleDataset.toDF() peopleDataframe.show() //轉換成rdd val rdd: RDD[People] = peopleDataset.rdd //遍歷該rdd for(people <- rdd){ println(people.name+"\t"+people.age) } spark.stop() } }