1、DataFrame簡介:
在Spark中,DataFrame是一種以RDD為基礎的分布式數據據集,類似於傳統數據庫聽二維表格,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。
類似這樣的
root
|-- age: long (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
2、准備測試結構化數據集
people.json
{"id":1, "name":"Ganymede", "age":32}
{"id":2, "name":"Lilei", "age":19}
{"id":3, "name":"Lily", "age":25}
{"id":4, "name":"Hanmeimei", "age":25}
{"id":5, "name":"Lucy", "age":37}
{"id":6, "name":"Tom", "age":27}
3、通過編程方式理解DataFrame
1) 通過DataFrame的API來操作數據
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
object DataFrameTest {
def main(args: Array[String]): Unit = {
//日志顯示級別
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)
//初始化
val conf = new SparkConf().setAppName("DataFrameTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("people.json")
//查看df中的數據
df.show()
//查看Schema
df.printSchema()
//查看某個字段
df.select("name").show()
//查看多個字段,plus為加上某值
df.select(df.col("name"), df.col("age").plus(1)).show()
//過濾某個字段的值
df.filter(df.col("age").gt(25)).show()
//count group 某個字段的值
df.groupBy("age").count().show()
//foreach 處理各字段返回值
df.select(df.col("id"), df.col("name"), df.col("age")).foreach { x =>
{
//通過下標獲取數據
println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))
}
}
//foreachPartition 處理各字段返回值,生產中常用的方式
df.select(df.col("id"), df.col("name"), df.col("age")).foreachPartition { iterator =>
iterator.foreach(x => {
//通過字段名獲取數據
println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))
})
}
}
}
2)通過注冊表,操作sql的方式來操作數據
-
import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Level import org.apache.log4j.Logger /** * @author Administrator */ object DataFrameTest2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("DataFrameTest2") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("people.json") df.registerTempTable("people") df.show(); df.printSchema(); //查看某個字段 sqlContext.sql("select name from people ").show() //查看多個字段 sqlContext.sql("select name,age+1 from people ").show() //過濾某個字段的值 sqlContext.sql("select age from people where age>=25").show() //count group 某個字段的值 sqlContext.sql("select age,count(*) cnt from people group by age").show() //foreach 處理各字段返回值 sqlContext.sql("select id,name,age from people ").foreach { x => { //通過下標獲取數據 println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2)) } } //foreachPartition 處理各字段返回值,生產中常用的方式 sqlContext.sql("select id,name,age from people ").foreachPartition { iterator => iterator.foreach(x => { //通過字段名獲取數據 println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age")) }) } } }
兩種方式運行結果是一樣的,第一種適合程序員,第二種適合熟悉sql的人員。
4、對於非結構化的數據
people.txt
-
1,Ganymede,32 2, Lilei, 19 3, Lily, 25 4, Hanmeimei, 25 5, Lucy, 37 6, wcc, 4
1) 通過字段反射來映射注冊臨時表
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
/**
* @author Administrator
*/
object DataFrameTest3 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("DataFrameTest3")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val people = sc.textFile("people.txt")
val peopleRowRDD = people.map { x => x.split(",") }.map { data =>
{
val id = data(0).trim().toInt
val name = data(1).trim()
val age = data(2).trim().toInt
Row(id, name, age)
}
}
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)));
val df = sqlContext.createDataFrame(peopleRowRDD, structType);
df.registerTempTable("people")
df.show()
df.printSchema()
}
}
2) 通過case class反射來映射注冊臨時表
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
/**
* @author Administrator
*/
object DataFrameTest4 {
case class People(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("DataFrameTest4")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val people = sc.textFile("people.txt")
val peopleRDD = people.map { x => x.split(",") }.map { data =>
{
People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt)
}
}
//這里需要隱式轉換一把
import sqlContext.implicits._
val df = peopleRDD.toDF()
df.registerTempTable("people")
df.show()
df.printSchema()
}
}
5、總結:
Spark SQL是Spark中的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數據。
DataFrame,可以理解為是,以列的形式組織的,分布式的數據集合。它其實和關系型數據庫中的表非常類似,但是底層做了很多的優化。DataFrame可以通過很多來源進行構建,包括:結構化的數據文件,Hive中的表,外部的關系型數據庫,以及RDD。