spark source API:構建DF、RDD與DF之間的相互轉換、show()的三種用法
目錄
讀取csv格式的文件,構建DF
讀取json格式的文件,構建DF
1、不需要指定分割方式
2、不需要指定字段名和字段類型(json自帶字段和字段類型)
3、 json格式的文件相對於csv,會占用額外的空間,一般不使用json格式存儲數據
讀取數據庫中的數據(JDBC構建DF)
1、不需要指定分割方式
2、不需要指定字段名和字段類型(spark會繼承數據庫中的表結構)
3、執行之前pom.xml需要有mysql依賴
讀取parquet格式的文件,構建DF
1、不需要指定分割方式
2、不需要指定字段名和字段類型
3、parquet是一種壓縮的格式,同時會自帶列名,可以和hive完全兼容
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Demo2SourceAPI {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Demo2SourceAPI")
.master("local")
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
//導包--導入sql所有的函數
import org.apache.spark.sql.functions._
//導入隱式轉換
import spark.implicits._
/**
* 讀取csv格式的文件,創建DF
*/
val studentsDF: DataFrame = spark
.read
.format("csv")
.option("sep",",")
.schema("id STRING, name STRING, age INT, gender STRING, clazz STRING")
.load("data/students.txt")
// studentsDF.show()
/**
* 讀取json格式的文件,創建DF
* 讀取json格式的文件:
* 1、不需要指定分割方式
* 2、不需要指定字段名和字段類型(json自帶字段和字段類型)
* json格式的文件相對於csv,會占用額外的空間,一般不使用json格式存儲數據
*/
val jsonDF: DataFrame = spark
.read
.format("json")
.load("data/students.json")
// jsonDF.printSchema() json格式的文件自帶表結構,可以使用printSchema()查看它自帶的表結構
// jsonDF.show()
/**
* 使用JDBC構建DF(讀取數據庫中的數據)
* 1、不需要指定分割方式
* 2、不需要指定字段名和字段類型(spark會繼承數據庫中的表結構)
* 3、執行之前pom.xml需要有mysql依賴
*/
val jdbcDF: DataFrame = spark
.read
.format("jdbc")
.option("url","jdbc:mysql://master:3306")
.option("dbtable", "lyw11.student")
.option("user", "root")
.option("password", "123456")
.load()
jdbcDF.show()
/**
* 讀取parquet格式的文件:
* 1、不需要指定分割方式
* 2、不需要指定字段名和字段類型
*
* parquet是一種壓縮的格式,同時會自帶列名,可以和hive完全兼容
*
*/
//如果我們現在手里沒有parquet格式的文件
//我們可以將csv格式的文件轉成parquet格式的文件(利用保存數據來轉格式)
studentsDF
.write
.mode(SaveMode.Overwrite) //指定保存模式
.format("parquet")//指定保存格式為parquet
.save("data/parquet")//指定路徑和輸出目錄
//讀取parquet格式的文件
val parquetDF: DataFrame = spark
.read
.format("parquet")
.load("data/parquet")
parquetDF.show()
}
}
RDD與DF可以相互轉換
RDD轉換為DF:.toDF
DF轉換成RDD :.rdd
package com.shujia.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Demo3RDDToDF {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("Demo3RDDToDF")
.getOrCreate()
//導入隱式轉換
import spark.implicits._
//導入所有的函數
import org.apache.spark.sql.functions._
//先獲取sparkContext,通過SparkContext才能調用textFile
val sc: SparkContext = spark.sparkContext
//讀取文件構建RDD
val stdeuntRDD: RDD[String] = sc.textFile("data/students.txt")
//切分一下數據,轉換成元組的形式
val stuRDD: RDD[(String, String, Int, String, String)] = stdeuntRDD.map(stu => {
val split: Array[String] = stu.split(",")
(split(0), split(1), split(2).toInt, split(3), split(4))
})
stuRDD.foreach(println)
//(1500100001,施笑槐,22,女,文科六班)
//...
/**
* 將RDD轉換成DF .toDF("列名")
* RDD的類型是一個元組,轉換的時候需要指定列名
*/
val df: DataFrame = stuRDD.toDF("id", "name", "age", "gender", "clazz")
df.show()
//+----------+------+---+------+--------+
//| id| name|age|gender| clazz|
//+----------+------+---+------+--------+
//|1500100001|施笑槐| 22| 女|文科六班|
//|1500100002|呂金鵬| 24| 男|文科六班|
//...
/**
* DF轉換成RDD .rdd
* 類型是ROW,ROW代表一行數據,可以使用列名獲取列值
*/
val rdd: RDD[Row] = df.rdd
/**
* 解析Row,使用列名獲取列值----getAs("列名")
*/
val tRDD1: RDD[(String, String, Int, String, String)] = rdd.map(row => {
val id: String = row.getAs[String]("id")
val name: String = row.getAs[String]("name")
val age: Int = row.getAs[Int]("age")
val gender: String = row.getAs[String]("gender")
val clazz: String = row.getAs[String]("clazz")
(id, name, age, gender, clazz)
})
tRDD1.foreach(println)
//(1500100001,施笑槐,22,女,文科六班)
//...
/**
* 解析Row,使用模式匹配
*/
val tRDD2: RDD[(String, String, Int, String, String)] = rdd.map {
case Row(id: String, name: String, age: Int, gender: String, clazz: String) =>
(id, name, age, gender, clazz)
}
tRDD2.foreach(println)
//(1500100001,施笑槐,22,女,文科六班)
//...
}
}
DF中的show()用法
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo4Show {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("Demo4Show")
.getOrCreate()
val studentsDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",") //默認分割方式是逗號
.schema("id STRING, name STRING , age INT ,gender STRING ,clazz STRING")
.load("data/students.txt")
/**
* 指定打印的行數,show()默認打印20行
*/
//打印數據(默認打印前20行)
studentsDF.show()
//打印數據,指定獲取的行數
studentsDF.show(100)
/**
* 當單行數據字符較長時,打印的時候會出現省略標點
* 想要查看完整的數據,需要指定參數
*/
//完全顯示數據
studentsDF.show(100, false)
}
}
