SparkSQL讀寫外部數據源-基本操作load和save


 

數據源-基本操作load和save

object BasicTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("BasicTest")
      .master("local")
      .getOrCreate()

    //最基本的讀取(load)和保存(write)操作,操作的文件的數據格式默認是parquet
    val sessionDF = spark.read.load(s"${BASE_PATH}/trackerSession")
    sessionDF.show()

    sessionDF.select("ip", "cookie").write.save(s"${BASE_PATH}/trackerSession_ip_cookie")

    //可以讀取多個文件目錄下的數據文件
    val multiSessionDF = spark.read.load(s"${BASE_PATH}/trackerSession",
      s"${BASE_PATH}/trackerSession_ip_cookie")
    multiSessionDF.show()

    //讀取的時候指定schema
    val schema = StructType(StructField("ip", StringType) :: Nil)
    val specSessionDF = spark.read.schema(schema).load(s"${BASE_PATH}/trackerSession")
    specSessionDF.show()

    //指定數據源數據格式
    //讀取json文件, 且將讀取出來的數據保存為parquet文件
    val deviceInfoDF = spark.read.format("json").load(s"${BASE_PATH}/IoT_device_info.json")
    spark.read.json(s"${BASE_PATH}/IoT_device_info.json").show()

    deviceInfoDF.write.format("orc").save(s"${BASE_PATH}/iot")
    deviceInfoDF.write.orc(s"${BASE_PATH}/iot2")

    //option傳遞參數,改變讀寫數據源的行為
    spark.read.option("mergeSchema", "true").parquet(s"${BASE_PATH}/trackerSession")
    deviceInfoDF.write.option("compression", "snappy").parquet(s"${BASE_PATH}/iot2_parquet")

    val optsMap = Map("mergeSchema" -> "mergeSchema")
    spark.read.options(optsMap).parquet("")

    //SaveMode
    //SaveMode.ErrorIfExists(對應着字符串"error"):表示如果目標文件目錄中數據已經存在了,則拋異常(這個是默認的配置)
    //SaveMode.Append(對應着字符串"append"):表示如果目標文件目錄中數據已經存在了,則將數據追加到目標文件中
    //SaveMode.Overwrite(對應着字符串"overwrite"):表示如果目標文件目錄中數據已經存在了,則用需要保存的數據覆蓋掉已經存在的數據
    //SaveMode.Ignore(對應着字符串為:"ignore"):表示如果目標文件目錄中數據已經存在了,則不做任何操作

    deviceInfoDF.write.option("compression", "snappy").mode(SaveMode.Ignore).parquet(s"${BASE_PATH}/iot/iot2_parquet")
    spark.read.parquet(s"${BASE_PATH}/iot/iot2_parquet").show()
    deviceInfoDF.write.option("compression", "snappy").mode("ignore").parquet(s"${BASE_PATH}/iot/iot2_parquet")

    spark.stop()
  }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM