一. 讀取和保存說明
SparkSQL提供了通用的保存數據和數據加載的方式,還提供了專用的方式
讀取:通用和專用
保存
保存有四種模式:
默認: error : 輸出目錄存在就報錯
append: 向輸出目錄追加
overwrite : 覆蓋寫
ignore: 忽略,不寫
二. 數據格式
1. Parquet
Spark SQL的默認數據源為Parquet格式。Parquet是一種能夠有效存儲嵌套數據的列式存儲格式。
數據源為Parquet文件時,Spark SQL可以方便的執行所有的操作,不需要使用format。修改配置項spark.sql.sources.default,可修改默認數據源格式。
讀取
val df = spark.read.load("examples/src/main/resources/users.parquet")
保存
//讀取json文件格式
var df = spark.read.json("/opt/module/data/input/people.json")
//保存為parquet格式
df.write.mode("append").save("/opt/module/data/output")
2. Json
Spark SQL 能夠自動推測JSON數據集的結構,並將它加載為一個Dataset[Row]. 可以通過SparkSession.read.json()去加載JSON 文件。
注意:Spark讀取的JSON文件不是傳統的JSON文件,每一行都應該是一個JSON串。
數據格式:employees.json
{"name":"Michael"}
{"name":"Andy", "age":30}
1)導入隱式轉換
import spark.implicits._
2)讀取Json文件
//專用的讀取
val df1: DataFrame = sparkSession.read.json("input/employees.json")
//通用讀取
val df: DataFrame = sparkSession.read.format("json").load("input/employees.json")
3)保存為Json文件
//導隱式包,轉為DataSet
import sparkSession.implicits.
val ds: Dataset[Emp] = rdd.toDS()
ds.write.mode("overwrite")json("output/emp.json")
3. CSV
CSV: 逗號作為字段分割符的文件
tsv: \t,tab作為字段分割符的文件
讀取
// 通用的讀取
val df: DataFrame = sparkSession.read.format("csv").load("input/person.csv")
// 專用的讀
val df1: DataFrame = sparkSession.read.csv("input/person.csv")
保存
CSV的參數可以到DataFrameReader 609行查看
//DataFrame
df1.write.option("sep",",").mode("overwrite").csv("output/csv")
4. Mysql
讀取
val props = new Properties()
/*
JDBC中能寫什么參數,參考 JDBCOptions 223行
*/
props.put("user","root")
props.put("password","root")
//庫名
val df: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/spark_test", "tbl_user", props)
// 全表查詢 只顯示前N條
df.show()
//指定查詢
df.createTempView("user")
sparkSession.sql("select * from user where id > 5").show()
//通用的讀
通用的讀
讀取mysql的數據
/**
* @description: 測試讀取mysql數據
* @author: HaoWu
* @create: 2020年09月11日
*/
object ReadMysqlTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("readMysql")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
val ids = List(1,2,3,4).mkString("'", "','", "'")
val resutl = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/gmall0421?useSSL=false")
.option("user", "root")
.option("password", "root")
.option("query", s"select * from user_info where id in (${ids})")
.load()
.as[UserInfo] // df -> ds
.rdd
.map(userInfo => (userInfo.id, userInfo))
resutl.collect().foreach(print)
}
}
保存
val list = List(Emp("jack", 2222.22), Emp("jack1", 3222.22), Emp("jack2", 4222.22))
val rdd: RDD[Emp] = sparkSession.sparkContext.makeRDD(list, 1)
//導入隱式包
import sparkSession.implicits._
val ds: Dataset[Emp] = rdd.toDS()
val props = new Properties()
props.put("user","root")
props.put("password","root")
// 表名可以是已經存在的表t1,也可以是一張新表t1(用的多)
//專用的寫
ds.write.jdbc("jdbc:mysql://localhost:3306/0508","t1",props)
// 通用的寫
ds.write.
option("url","jdbc:mysql://localhost:3306/庫名")
//表名
.option("dbtable","t2")
.option("user","root")
.option("password","root")
.mode("append")
.format("jdbc").save()