CSV格式的文件也稱為逗號分隔值(Comma-Separated Values,CSV,有時也稱為字符分隔值,因為分隔字符也可以不是逗號。在本文中的CSV格式的數據就不是簡單的逗號分割的),其文件以純文本形式存表格數據(數字和文本)。CSV文件由任意數目的記錄組成,記錄間以某種換行符分隔;每條記錄由字段組成,字段間的分隔符是其它字符或字符串,最常見的是逗號或制表符。通常,所有記錄都有完全相同的字段序列。
本篇文章將介紹如何使用Spark 1.3+的外部數據源接口來自定義CSV輸入格式的文件解析器。這個外部數據源接口是由databricks公司開發並開源的(地址:https://github.com/databricks/spark-csv),通過這個類庫我們可以在Spark SQL中解析並查詢CSV中的數據。因為用到了Spark的外部數據源接口,所以我們需要在Spark 1.3+上面使用。在使用之前,我們需要引入以下的依賴:
2 |
< groupId >com.databricks</ groupId > |
3 |
< artifactId >spark-csv_2.10</ artifactId > |
4 |
< version >1.0.3</ version > |
目前spark-csv_2.10的最新版就是1.0.3。如果我們想在Spark shell里面使用,我們可以在--jars
選項里面加入這個依賴,如下:
1 |
[iteblog@spark $] bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 |
和《Spark SQL整合PostgreSQL》文章中用到的load函數類似,在使用CSV類庫的時候,我們需要在options
中傳入以下幾個選項:
1、path
:看名字就知道,這個就是我們需要解析的CSV文件的路徑,路徑支持通配符;
2、header
:默認值是false。我們知道,CSV文件第一行一般是解釋各個列的含義的名稱,如果我們不需要加載這一行,我們可以將這個選項設置為true;
3、delimiter
:默認情況下,CSV是使用英文逗號分隔的,如果不是這個分隔,我們就可以設置這個選項。
4、quote
:默認情況下的引號是'"',我們可以通過設置這個選項來支持別的引號。
5、mode
:解析的模式。默認值是PERMISSIVE
,支持的選項有
(1)、PERMISSIVE
:嘗試解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.
(2)、DROPMALFORMED
:drops lines which have fewer or more tokens than expected
(3)、FAILFAST
: aborts with a RuntimeException if encounters any malformed line
如何使用
1、在Spark SQL中使用
我們可以通過注冊臨時表,然后使用純SQL方式去查詢CSV文件:
2 |
USING com.databricks.spark.csv |
3 |
OPTIONS (path "cars.csv" , header "true" ) |
我們還可以在DDL中指定列的名字和類型,如下:
1 |
CREATE TABLE cars (yearMade double , carMake string, carModel string, comments string, blank string) |
2 |
USING com.databricks.spark.csv |
3 |
OPTIONS (path "cars.csv" , header "true" ) |
推薦的方式是通過調用SQLContext
的load/save
函數來加載CSV數據:
1 |
import org.apache.spark.sql.SQLContext |
3 |
val sqlContext = new SQLContext(sc) |
4 |
val df = sqlContext.load( "com.databricks.spark.csv" , Map( "path" -> "cars.csv" , "header" -> "true" )) |
5 |
df.select( "year" , "model" ).save( "newcars.csv" , "com.databricks.spark.csv" ) |
當然,我們還可以使用com.databricks.spark.csv._
的隱式轉換:
1 |
import org.apache.spark.sql.SQLContext |
2 |
import com.databricks.spark.csv. _ |
4 |
val sqlContext = new SQLContext(sc) |
6 |
val cars = sqlContext.csvFile( "cars.csv" ) |
7 |
cars.select( "year" , "model" ).saveAsCsvFile( "newcars.tsv" ) |
3、在Java中使用
和在Scala中使用類似,我們也推薦調用SQLContext
類中 load/save
函數
07 |
* 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨 |
08 |
* 過往記憶博客微信公共帳號:iteblog_hadoop |
11 |
import org.apache.spark.sql.SQLContext |
13 |
SQLContext sqlContext = new SQLContext(sc); |
15 |
HashMap<String, String> options = new HashMap<String, String>(); |
16 |
options.put( "header" , "true" ); |
17 |
options.put( "path" , "cars.csv" ); |
19 |
DataFrame df = sqlContext.load( "com.databricks.spark.csv" , options); |
20 |
df.select( "year" , "model" ).save( "newcars.csv" , "com.databricks.spark.csv" ); |
在Java或者是Scala中,我們可以通過CsvParser里面的函數來讀取CSV文件:
1 |
import com.databricks.spark.csv.CsvParser; |
2 |
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); |
4 |
DataFrame cars = ( new CsvParser()).withUseHeader( true ).csvFile(sqlContext, "cars.csv" ); |
在Python中,我們也可以使用SQLContext
類中 load/save
函數來讀取和保存CSV文件:
1 |
from pyspark.sql import SQLContext |
2 |
sqlContext = SQLContext(sc) |
4 |
df = sqlContext.load(source = "com.databricks.spark.csv" , header = "true" , path = "cars.csv" ) |
5 |
df.select( "year" , "model" ).save( "newcars.csv" , "com.databricks.spark.csv" ) |