使用Spark讀寫CSV格式文件(轉)


原文鏈接:使用Spark讀寫CSV格式文件

CSV格式的文件也稱為逗號分隔值(Comma-Separated Values,CSV,有時也稱為字符分隔值,因為分隔字符也可以不是逗號。在本文中的CSV格式的數據就不是簡單的逗號分割的),其文件以純文本形式存表格數據(數字和文本)。CSV文件由任意數目的記錄組成,記錄間以某種換行符分隔;每條記錄由字段組成,字段間的分隔符是其它字符或字符串,最常見的是逗號或制表符。通常,所有記錄都有完全相同的字段序列。

  本篇文章將介紹如何使用Spark 1.3+的外部數據源接口來自定義CSV輸入格式的文件解析器。這個外部數據源接口是由databricks公司開發並開源的(地址:https://github.com/databricks/spark-csv),通過這個類庫我們可以在Spark SQL中解析並查詢CSV中的數據。因為用到了Spark的外部數據源接口,所以我們需要在Spark 1.3+上面使用。在使用之前,我們需要引入以下的依賴:

1 <dependency>
2     <groupId>com.databricks</groupId>
3     <artifactId>spark-csv_2.10</artifactId>
4     <version>1.0.3</version>
5 </dependency>

目前spark-csv_2.10的最新版就是1.0.3。如果我們想在Spark shell里面使用,我們可以在--jars選項里面加入這個依賴,如下:

1 [iteblog@spark $] bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
 

  和《Spark SQL整合PostgreSQL》文章中用到的load函數類似,在使用CSV類庫的時候,我們需要在options中傳入以下幾個選項:

  1、path:看名字就知道,這個就是我們需要解析的CSV文件的路徑,路徑支持通配符;
  2、header:默認值是false。我們知道,CSV文件第一行一般是解釋各個列的含義的名稱,如果我們不需要加載這一行,我們可以將這個選項設置為true;
  3、delimiter:默認情況下,CSV是使用英文逗號分隔的,如果不是這個分隔,我們就可以設置這個選項。
  4、quote:默認情況下的引號是'"',我們可以通過設置這個選項來支持別的引號。
  5、mode:解析的模式。默認值是PERMISSIVE,支持的選項有
    (1)、PERMISSIVE:嘗試解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.
    (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected
    (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line

如何使用

1、在Spark SQL中使用

  我們可以通過注冊臨時表,然后使用純SQL方式去查詢CSV文件:

1 CREATE TABLE cars
2 USING com.databricks.spark.csv
3 OPTIONS (path "cars.csv", header "true")

我們還可以在DDL中指定列的名字和類型,如下:

1 CREATE TABLEcars (yearMade double, carMake string, carModel string, comments string, blank string)
2 USING com.databricks.spark.csv
3 OPTIONS (path "cars.csv", header "true")

2、通過Scala方式

  推薦的方式是通過調用SQLContextload/save函數來加載CSV數據:

1 import org.apache.spark.sql.SQLContext
2  
3 val sqlContext = new SQLContext(sc)
4 val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv""header"-> "true"))
5 df.select("year""model").save("newcars.csv""com.databricks.spark.csv")

當然,我們還可以使用com.databricks.spark.csv._的隱式轉換:

1 import org.apache.spark.sql.SQLContext
2 import com.databricks.spark.csv._
3  
4 val sqlContext = new SQLContext(sc)
5  
6 val cars = sqlContext.csvFile("cars.csv")
7 cars.select("year""model").saveAsCsvFile("newcars.tsv")

3、在Java中使用

和在Scala中使用類似,我們也推薦調用SQLContext類中 load/save函數

01 /**
02  * User: 過往記憶
03  * Date: 2015-06-01
04  * Time: 下午23:26
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1380
07  * 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
08  * 過往記憶博客微信公共帳號:iteblog_hadoop
09  */
10  
11 import org.apache.spark.sql.SQLContext
12  
13 SQLContext sqlContext = new SQLContext(sc);
14  
15 HashMap<String, String> options = new HashMap<String, String>();
16 options.put("header""true");
17 options.put("path""cars.csv");
18  
19 DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
20 df.select("year""model").save("newcars.csv""com.databricks.spark.csv");

在Java或者是Scala中,我們可以通過CsvParser里面的函數來讀取CSV文件:

1 import com.databricks.spark.csv.CsvParser;
2 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
3  
4 DataFrame cars = (new CsvParser()).withUseHeader(true).csvFile(sqlContext, "cars.csv");

4、在Python中使用

Python中,我們也可以使用SQLContext類中 load/save函數來讀取和保存CSV文件:

1 from pyspark.sql import SQLContext
2 sqlContext = SQLContext(sc)
3  
4 df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
5 df.select("year""model").save("newcars.csv""com.databricks.spark.csv")


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM