object CSVFileTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("CSVFileTest") .master("local") .getOrCreate() import spark.implicits._ val df = spark.read.json(s"${BASE_PATH}/people.json") //將json文件數據轉化成csv文件數據 df.write.mode(SaveMode.Overwrite).csv(s"${BASE_PATH}/csv") val csvDF = spark.read.csv(s"${BASE_PATH}/csv").toDF("age", "name") csvDF.show() //從String類型中的Dataset來創建DataFrame val csvDS = spark.createDataset(Seq("23,jeffy", "34,katy")) val ds = spark.read.csv(csvDS) ds.show() //1: sep 和 delimiter的功能都是一樣,都是表示csv的切割符,(默認是,)(讀寫參數) spark.read.csv(Seq("23,jeffy", "34,katy").toDS()).show() spark.read.option("sep", " ").csv(Seq("23 jeffy", "34 katy").toDS()).show() spark.read.option("delimiter", " ").csv(Seq("23 jeffy", "34 katy").toDS()).show() ds.write.mode(SaveMode.Overwrite).option("sep", "|").csv(s"${BASE_PATH}/delimiter") //2: header(默認是false) 表示是否將csv文件中的第一行作為schema(讀寫參數) spark.read.csv(s"${BASE_PATH}/cars.csv").show() /* +----+-----+-----+--------------------+-----+ | _c0| _c1| _c2| _c3| _c4| +----+-----+-----+--------------------+-----+ |year| make|model| comment|blank| |2012|Tesla| S| No comment| null| |1997| Ford| E350|Go get one now th...| null| |2015|Chevy| Volt| null| null| +----+-----+-----+--------------------+-----+ */ val headerDF = spark.read.option("header", true).csv(s"${BASE_PATH}/cars.csv") headerDF.printSchema() headerDF.write.mode(SaveMode.Overwrite).option("header", true).csv(s"${BASE_PATH}/headerDF") /* root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) */ headerDF.show() /* +----+-----+-----+--------------------+-----+ |year| make|model| comment|blank| +----+-----+-----+--------------------+-----+ |2012|Tesla| S| No comment| null| |1997| Ford| E350|Go get one now th...| null| |2015|Chevy| Volt| null| null| +----+-----+-----+--------------------+-----+ */ //3: inferSchema 表示是否支持從數據中推導出schema(只讀參數) val inferSchemaDF = spark.read.option("header", true).option("inferSchema", true).csv(s"${BASE_PATH}/cars.csv") inferSchemaDF.printSchema() /* root |-- year: integer (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) */ inferSchemaDF.show() /* +----+-----+-----+--------------------+-----+ |year| make|model| comment|blank| +----+-----+-----+--------------------+-----+ |2012|Tesla| S| No comment| null| |1997| Ford| E350|Go get one now th...| null| |2015|Chevy| Volt| null| null| +----+-----+-----+--------------------+-----+ */ //4: charset和encoding(默認是UTF-8),根據指定的編碼器對csv文件進行解碼(只讀參數) spark.read.option("header", "true").option("encoding", "iso-8859-1").option("sep", "þ").csv(s"${BASE_PATH}/cars_iso-8859-1.csv").show() /* +----+-----+-----+--------------------+-----+ |year| make|model| comment|blank| +----+-----+-----+--------------------+-----+ |2012|Tesla| S| No comment| null| |1997| Ford| E350|Go get one now th...| null| |2015|Chevy| Volt| null| null| +----+-----+-----+--------------------+-----+ */ //5: quote(默認值是`"` ) 表示將不需要切割的字段值用quote標記起來(讀寫參數) var optMap = Map("quote" -> "\'", "delimiter" -> " ") spark.read.options(optMap).csv(Seq("23 'jeffy tang'", "34 katy").toDS()).show() /* +---+----------+ |_c0| _c1| +---+----------+ | 23|jeffy tang| | 34| katy| +---+----------+ */ //6: escape(默認值是`\`) 如果在quote標記的字段值中還含有quote,則用escape來避免(讀寫參數) optMap = Map("quote" -> "\'", "delimiter" -> " ", "escape" -> "\"") spark.read.options(optMap).csv(Seq("23 'jeffy \"'tang'", "34 katy").toDS()).show() //7: comment(默認是空字符串,表示關閉這個功能) 表示csv中的注釋的標記符(讀寫參數) optMap = Map("comment" -> "~", "header" -> "false") spark.read.options(optMap).csv(s"${BASE_PATH}/comments.csv").show() /* +---+---+---+---+----+-------------------+ |_c0|_c1|_c2|_c3| _c4| _c5| +---+---+---+---+----+-------------------+ | 1| 2| 3| 4|5.01|2015-08-20 15:57:00| | 6| 7| 8| 9| 0|2015-08-21 16:58:01| | 1| 2| 3| 4| 5|2015-08-23 18:00:42| +---+---+---+---+----+-------------------+ */ //8: (讀寫參數) //ignoreLeadingWhiteSpace(默認是false) 表示是否忽略字段值前面的空格 //ignoreTrailingWhiteSpace(默認是false) 表示是否忽略字段值后面的空格 optMap = Map("ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true") spark.read.options(optMap).csv(Seq(" a,b , c ").toDS()).show() //9: multiLine(默認是false) 是否支持一條記錄被拆分成了多行的csv的讀取解析(只讀參數) val primitiveFieldAndType = Seq( """" |string","integer | | |","long | |","bigInteger",double,boolean,null""".stripMargin, """"this is a |simple |string."," | |10"," |21474836470","92233720368547758070"," | |1.7976931348623157E308",true,""".stripMargin) primitiveFieldAndType.toDF("value").coalesce(1).write.mode(SaveMode.Overwrite).text(s"csv_multiLine_test") spark.read.option("header", true).option("multiLine", true).csv("csv_multiLine_test").show() //10: mode(默認是PERMISSIVE) (只讀參數) //PERMISSIVE 表示碰到解析錯誤的時候,將字段都置為null //DROPMALFORMED 表示忽略掉解析錯誤的記錄 //FAILFAST 當有解析錯誤的時候,立馬拋出異常 val schema = new StructType().add("a", IntegerType).add("b", TimestampType) val df1 = spark.read.option("mode", "PERMISSIVE").schema(schema).csv(Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS()) df1.show() //11: nullValue(默認是空字符串), 表示需要將nullValue指定的字符串解析成null(讀寫參數) spark.read.option("nullValue", "--").csv(Seq("0,2013-11-11,--", "1,1983-08-04,3").toDS()).show() //12: nanValue(默認值為NaN) (只讀參數) //positiveInf //negativeInf val numbers = spark.read.format("csv").schema(StructType(List( StructField("int", IntegerType, true), StructField("long", LongType, true), StructField("float", FloatType, true), StructField("double", DoubleType, true) ))).options(Map( "header" -> "true", "mode" -> "DROPMALFORMED", "nullValue" -> "--", "nanValue" -> "NAN", "negativeInf" -> "-INF", "positiveInf" -> "INF")).load(s"${BASE_PATH}/numbers.csv") numbers.show() /* +----+--------+---------+---------------+ | int| long| float| double| +----+--------+---------+---------------+ | 8| 1000000| 1.042|2.38485450374E7| |null|34232323| 98.343|184721.23987223| | 34| null| 98.343|184721.23987223| | 34|43323123| null|184721.23987223| | 34|43323123|223823.95| null| | 34|43323123| 223823.0| NaN| | 34|43323123| 223823.0| Infinity| | 34|43323123| 223823.0| -Infinity| +----+--------+---------+---------------+ */ //13: codec和compression 壓縮格式,支持的壓縮格式有: //none 和 uncompressed表示不壓縮 //bzip2、deflate、gzip、lz4、snappy (只寫參數) inferSchemaDF.write.mode(SaveMode.Overwrite).option("compression", "gzip").csv(s"${BASE_PATH}/compression") //14 dateFormat (讀寫參數) val customSchema = new StructType(Array(StructField("date", DateType, true))) val date1 = spark.read.option("dateFormat", "dd/MM/yyyy HH:mm").schema(customSchema).csv(Seq("26/08/2015 18:00", "27/10/2014 18:30").toDS()) date1.printSchema() /* root |-- date: date (nullable = true) */ date1.write.mode(SaveMode.Overwrite).option("dateFormat", "yyyy-MM-dd").csv(s"${BASE_PATH}/dateFormat") spark.read.csv(s"${BASE_PATH}/dateFormat").show() //15: timestampFormat (讀寫參數) val timeSchema = new StructType(Array(StructField("date", TimestampType, true))) val time = spark.read.option("timestampFormat", "dd/MM/yyyy HH:mm").schema(timeSchema).csv(Seq("26/08/2015 18:00", "27/10/2014 18:30").toDS()) time.printSchema() /* root |-- date: timestamp (nullable = true) */ time.write.mode(SaveMode.Overwrite).option("timestampFormat", "yyyy-MM-dd HH:mm").csv(s"${BASE_PATH}/timestampFormat") spark.read.csv(s"${BASE_PATH}/timestampFormat").show() //16: maxColumns(默認是20480) 規定一個csv的一條記錄最大的列數 (只讀參數) spark.read.option("maxColumns", "3").csv(Seq("test,as,g", "h,bm,s").toDS()).show() //會報錯 spark.stop() } }