Spark(十二)【SparkSql中數據讀取和保存】


一. 讀取和保存說明

SparkSQL提供了通用的保存數據和數據加載的方式,還提供了專用的方式

讀取:通用和專用

保存

保存有四種模式:
  默認: error :  輸出目錄存在就報錯
        append:  向輸出目錄追加
        overwrite : 覆蓋寫
        ignore:  忽略,不寫

二. 數據格式

1. Parquet

Spark SQL的默認數據源為Parquet格式。Parquet是一種能夠有效存儲嵌套數據的列式存儲格式。

數據源為Parquet文件時,Spark SQL可以方便的執行所有的操作,不需要使用format。修改配置項spark.sql.sources.default,可修改默認數據源格式。

讀取

val df = spark.read.load("examples/src/main/resources/users.parquet")

保存

//讀取json文件格式
var df = spark.read.json("/opt/module/data/input/people.json")
//保存為parquet格式
df.write.mode("append").save("/opt/module/data/output")

2. Json

Spark SQL 能夠自動推測JSON數據集的結構,並將它加載為一個Dataset[Row]. 可以通過SparkSession.read.json()去加載JSON 文件。

注意:Spark讀取的JSON文件不是傳統的JSON文件,每一行都應該是一個JSON串。

數據格式:employees.json

{"name":"Michael"}
{"name":"Andy", "age":30}

1)導入隱式轉換

import spark.implicits._

2)讀取Json文件

//專用的讀取
val df1: DataFrame = sparkSession.read.json("input/employees.json")
//通用讀取
val df: DataFrame = sparkSession.read.format("json").load("input/employees.json")

3)保存為Json文件

    //導隱式包,轉為DataSet
    import sparkSession.implicits.
    val ds: Dataset[Emp] = rdd.toDS()
    ds.write.mode("overwrite")json("output/emp.json")

3. CSV

CSV: 逗號作為字段分割符的文件
tsv: \t,tab作為字段分割符的文件

讀取

    // 通用的讀取
    val df: DataFrame = sparkSession.read.format("csv").load("input/person.csv")
    // 專用的讀
    val df1: DataFrame = sparkSession.read.csv("input/person.csv")

保存

CSV的參數可以到DataFrameReader 609行查看

//DataFrame
df1.write.option("sep",",").mode("overwrite").csv("output/csv")

4. Mysql

讀取

    val props = new Properties()
    /*
        JDBC中能寫什么參數,參考 JDBCOptions 223行
     */
    props.put("user","root")
    props.put("password","root")
    //庫名
    val df: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/spark_test", "tbl_user", props)
    // 全表查詢    只顯示前N條
    df.show()
    //指定查詢
    df.createTempView("user")
    sparkSession.sql("select * from user where id > 5").show()

	 //通用的讀
	 

通用的讀

讀取mysql的數據

/**
 * @description: 測試讀取mysql數據
 * @author: HaoWu
 * @create: 2020年09月11日
 */
object ReadMysqlTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("readMysql")
    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    import spark.implicits._
    val ids = List(1,2,3,4).mkString("'", "','", "'")
    val resutl = spark
      .read
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/gmall0421?useSSL=false")
      .option("user", "root")
      .option("password", "root")
      .option("query", s"select * from user_info where id in (${ids})")
      .load()
      .as[UserInfo] // df -> ds
      .rdd
      .map(userInfo => (userInfo.id, userInfo))

    resutl.collect().foreach(print)
  }
}

保存

    val list = List(Emp("jack", 2222.22), Emp("jack1", 3222.22), Emp("jack2", 4222.22))
    val rdd: RDD[Emp] = sparkSession.sparkContext.makeRDD(list, 1)
    //導入隱式包
    import sparkSession.implicits._
    val ds: Dataset[Emp] = rdd.toDS()
    val props = new Properties()
    props.put("user","root")
    props.put("password","root")
    //  表名可以是已經存在的表t1,也可以是一張新表t1(用的多)  
    
    //專用的寫
    ds.write.jdbc("jdbc:mysql://localhost:3306/0508","t1",props)
	 

    // 通用的寫
    ds.write.
      option("url","jdbc:mysql://localhost:3306/庫名")
		//表名
      .option("dbtable","t2")
      .option("user","root")
      .option("password","root")
      .mode("append")
      .format("jdbc").save()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM