SparkSQL讀寫外部數據源-json文件的讀寫


object JsonFileTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("JsonFileTest")
      .getOrCreate()

    import spark.implicits._

    //將parquet文件數據轉化成json文件數據
    val sessionDf = spark.read.parquet(s"${BASE_PATH}/trackerSession")
    sessionDf.show()

    sessionDf.write.json(s"${BASE_PATH}/json")

    //讀取json文件數據
    val jsonDF = spark.read.json(s"${BASE_PATH}/json")
    jsonDF.show()

    //可以從JSON Dataset(類型為String)中創建一個DF
    val jsonDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherJsonDF = spark.read.json(jsonDataset)
    otherJsonDF.show()

    //primitivesAsString(默認為false) 表示將基本類型轉化為string類型,這里的基本類型包括:boolean、int、long、float、double
    //prefersDecimal(默認是false)表示在primitivesAsString為false的時候,將float,double轉成DecimalType
    val jsonDataset_1 = spark.createDataset(
      """{"name":"Yin","address":{"is_old":true,"area":23000.34}}""" :: Nil)
    var otherJsonDF_1 = spark.read.json(jsonDataset_1)
    otherJsonDF_1.printSchema()
    /*
    root
     |-- address: struct (nullable = true)
     |    |-- area: double (nullable = true)
     |    |-- is_old: boolean (nullable = true)
     |-- name: string (nullable = true)
     */

    var optsMap = Map("primitivesAsString" -> "true", "prefersDecimal" -> "true")
    otherJsonDF_1 = spark.read.options(optsMap).json(jsonDataset_1)
    otherJsonDF_1.printSchema()
    /*
    root
     |-- address: struct (nullable = true)
     |    |-- area: string (nullable = true)
     |    |-- is_old: string (nullable = true)
     |-- name: string (nullable = true)
     */

    optsMap = Map("primitivesAsString" -> "false", "prefersDecimal" -> "true")
    otherJsonDF_1 = spark.read.options(optsMap).json(jsonDataset_1)
    otherJsonDF_1.printSchema()
    /*
    root
     |-- address: struct (nullable = true)
     |    |-- area: decimal(7,2) (nullable = true)
     |    |-- is_old: boolean (nullable = true)
     |-- name: string (nullable = true)
     */


    //allowComments(默認是false),表示是否支持json中含有java/c格式的注釋
    spark.read.option("allowComments", "true").json(Seq("""{"name":/* hello */"Yin","address":{"is_old":true,"area":23000.34}}""").toDS()).show()

    //allowUnquotedFieldNames(默認是false),表示是否支持json中含有沒有引號的域名
    spark.read.option("allowUnquotedFieldNames", "true").json(Seq("""{name:"Yin","address":{"is_old":true,"area":23000.34}}""").toDS()).show()

    //allowSingleQuotes(默認是true),表示是否支持json中含有單引號的域名或者值
    spark.read.option("allowSingleQuotes", "true").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":23000.34}}""").toDS()).show()

    //allowNumericLeadingZeros(默認是false),表示是否支持json中含有以0開頭的數值
    spark.read.option("allowNumericLeadingZeros", "true").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":0023000.34}}""").toDS()).show()

    //allowNonNumericNumbers(默認是false),表示是否支持json中含有NaN(not a number)
    spark.read.option("allowNonNumericNumbers", "true").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":NaN}}""").toDS()).show()

    //allowBackslashEscapingAnyCharacter(默認是false),表示是否支持json中含有反斜杠,且將反斜杠忽略掉
    spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":"\$23000"}}""").toDS()).show()

    //mode(默認是PERMISSIVE),表是碰到格式解析錯誤的json的處理行為是:
    //PERMISSIVE 表示比較寬容的。如果某條格式錯誤,則新增一個字段,字段名為columnNameOfCorruptRecord的值,字段的值是錯誤格式的json字符串,其他的是null
    spark.read.option("mode", "PERMISSIVE").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":3000}}""",
      """{'name':'Yin',"address":{"is_old":true,"area":\3000}}""").toDS()).show()
    /*
    +--------------------+-----------+----+
    |     _corrupt_record|    address|name|
    +--------------------+-----------+----+
    |                null|[3000,true]| Yin|
    |{'name':'Yin',"ad...|       null|null|
    +--------------------+-----------+----+
     */
    spark.read.option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", "customer_column").json(
      Seq("""{'name':'Yin',"address":{"is_old":true,"area":3000}}""",
      """{'name':'Yin',"address":{"is_old":true,"area":\3000}}""").toDS()).show()
    /*
    +-----------+--------------------+----+
    |    address|     customer_column|name|
    +-----------+--------------------+----+
    |[3000,true]|                null| Yin|
    |       null|{'name':'Yin',"ad...|null|
    +-----------+--------------------+----+
     */
    //DROPMALFORMED 表示丟掉錯誤格式的那條記錄
    spark.read.option("mode", "DROPMALFORMED").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":3000}}""",
      """{'name':'Yin',"address":{"is_old":true,"area":\3000}}""").toDS()).show()
    /*
    +-----------+----+
    |    address|name|
    +-----------+----+
    |[3000,true]| Yin|
    +-----------+----+
     */
    //FAILFAST 碰到解析錯誤的記錄直接報錯
    spark.read.option("mode", "FAILFAST").json(Seq("""{'name':'Yin',"address":{"is_old":true,"area":3000}}""",
      """{'name':'Yin',"address":{"is_old":true,"area":\3000}}""").toDS()).show()

    //dateFormat(默認值為yyyy-MM-dd) 表示json中時間的字符串格式(對應着DataType)
    val customSchema = new StructType(Array(StructField("name", StringType, true),
      StructField("date", DateType, true)))
    val dataFormatDF =
      spark.read.schema(customSchema).option("dateFormat", "dd/MM/yyyy HH:mm").json(Seq(
        """{'name':'Yin',"date":"26/08/2015 18:00"}""").toDS())
    dataFormatDF.write.mode(SaveMode.Overwrite).option("dateFormat", "yyyy/MM/dd").json("testjson")
    spark.read.json("testjson").show()

    //timestampFormat(默認值為yyyy-MM-dd'T'HH:mm:ss.SSSZZ) 表示json中時間的字符串格式(對應着TimestampType)
    val customSchema_1 = new StructType(Array(StructField("name", StringType, true),
      StructField("date", TimestampType, true)))
    val timestampFormatDf =
      spark.read.schema(customSchema_1).option("timestampFormat", "dd/MM/yyyy HH:mm").json(Seq(
        """{'name':'Yin',"date":"26/08/2015 18:00"}""").toDS())

    val optMap = Map("timestampFormat" -> "yyyy/MM/dd HH:mm", DateTimeUtils.TIMEZONE_OPTION -> "GMT")
    timestampFormatDf.write.mode(SaveMode.Overwrite).format("json").options(optMap).save("test.json")
    spark.read.json("test.json").show()

    //compression 壓縮格式,支持的壓縮格式有:
    //none 和 uncompressed表示不壓縮
    //bzip2、deflate、gzip、lz4、snappy
    timestampFormatDf.write.mode(SaveMode.Overwrite).option("compression", "gzip").json("test.json")

    //multiLine 表示是否支持一條json記錄拆分成多行
    val primitiveFieldAndType: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize(
      """{"string":"this is a simple string.",
          "integer":10,
          "long":21474836470,
          "bigInteger":92233720368547758070,
          "double":1.7976931348623157E308,
          "boolean":true,
          "null":null
      }""" ::
        """{"string":"this is a simple string.",
 |          "integer":10,
 |          "long":21474836470,
 |          "bigInteger":92233720368547758070,
 |          "double":1.7976931348623157E308,
 |          "boolean":true,
 |          "null":null
 |      }""" :: Nil))(Encoders.STRING)
    primitiveFieldAndType.toDF("value").write.mode(SaveMode.Overwrite).option("compression", "GzIp").text(s"${BASE_PATH}/primitiveFieldAndType")

    val multiLineDF = spark.read.option("multiLine", false).json(s"${BASE_PATH}/primitiveFieldAndType")
    multiLineDF.show()

    spark.stop()
  }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM