SparkSQL讀寫部數據源——csv文件的讀寫


1. sep 和 delimiter的功能都是一樣,都是表示csv的切割符,(默認是,)(讀寫參數)

spark.read.option("sep", " ").csv(Seq("jeffy", "katy").toDS()).show()
spark.read.option("delimiter", " ").csv(Seq("jeffy", "katy").toDS()).show()
ds.write.mode(SaveMode.Overwrite).option("sep", "|").csv(s"${path}")

2. header(默認是false) 表示是否將csv文件中的第一行作為schema(讀寫參數)

spark.read.option("header", true).csv(s"${path}")

3.inferSchema 表示是否支持從數據中推導出schema(只讀參數)

spark.read.option("header", true).option("inferSchema", true).csv(s"${path}")

4.charset和encoding(默認是UTF-8),根據指定的編碼器對csv文件進行解碼(只讀參數)

spark.read.option("header", "true").option("encoding", "iso-8859-1").option("sep", "þ").csv(s"${path}").show()

5.quote(默認值是`"` ) 表示將不需要切割的字段值用quote標記起來(讀寫參數)

 var optMap = Map("quote" -> "\'", "delimiter" -> " ")
    spark.read.options(optMap).csv(Seq("23 'jeffy tang'", "34 katy").toDS()).show()

6.escape(默認值是`\`) 如果在quote標記的字段值中還含有quote,則用escape來避免(讀寫參數)

val optMap = Map("quote" -> "\'", "delimiter" -> " ", "escape" -> "\"")
spark.read.options(optMap).csv(Seq("23 'jeffy \"'tang'", "34 katy").toDS()).show()

7.comment(默認是空字符串,表示關閉這個功能) 表示csv中的注釋的標記符(讀寫參數)

val optMap = Map("comment" -> "~", "header" -> "false")
spark.read.options(optMap).csv(s"${BASE_PATH}/comments.csv").show()

8.(讀寫參數)ignoreLeadingWhiteSpace(默認是false) 表示是否忽略字段值前面的空格 /ignoreTrailingWhiteSpace(默認是false) 表示是否忽略字段值后面的空格

 val optMap = Map("ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true")
 spark.read.options(optMap).csv(Seq(" a,b  , c ").toDS()).show()

9. multiLine(默認是false) 是否支持一條記錄被拆分成了多行的csv的讀取解析(類似於execl單元格多行)(只讀參數)

spark.read.option("header", true).option("multiLine", true).csv(s"${path}").show()

10. mode(默認是PERMISSIVE) (只讀參數)

1) PERMISSIVE 表示碰到解析錯誤的時候,將字段都置為null

2) DROPMALFORMED 表示忽略掉解析錯誤的記錄

3) FAILFAST 當有解析錯誤的時候,立馬拋出異常
 spark.read.option("mode", "PERMISSIVE").schema(schema).csv(s"${path}")

11.  nullValue(默認是空字符串), 表示需要將nullValue指定的字符串解析成null(讀寫參數)

spark.read.option("nullValue", "--").csv(Seq("0,2013-11-11,--", "1,1983-08-04,3").toDS()).show()

12.nanValue(默認值為NaN) (只讀參數)

1) positiveInf

2) negativeInf

   spark.read.format("csv").schema(StructType(List(
        StructField("int", IntegerType, true),
        StructField("long", LongType, true),
        StructField("float", FloatType, true),
        StructField("double", DoubleType, true)
      ))).options(Map(
        "header" -> "true",
        "mode" -> "DROPMALFORMED",
        "nullValue" -> "--",
        "nanValue" -> "NAN",
        "negativeInf" -> "-INF",
        "positiveInf" -> "INF")).load(s"${BASE_PATH}/numbers.csv")

13.codec和compression 壓縮格式,支持的壓縮格式有:

none 和 uncompressed表示不壓縮;

bzip2、deflate、gzip、lz4、snappy (只寫參數)

inferSchemaDF.write.mode(SaveMode.Overwrite).option("compression", "gzip").csv(s"${path}")

14.maxColumns(默認是20480) 規定一個csv的一條記錄最大的列數 (只讀參數)

 spark.read.option("maxColumns", "3").csv(Seq("test,as,g", "h,bm,s").toDS()).show() //會報錯

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM