背景
Spark SQL是Spark的一個模塊,用於結構化數據的處理。
++++++++++++++ +++++++++++++++++++++
| SQL | | Dataset API |
++++++++++++++ +++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++
| Spark SQL |
+++++++++++++++++++++++++++++++++++++
使用Spark SQL的方式有2種,可以通過SQL或者Dataset API,這兩種使用方式在本文都會涉及。
其中,通過SQL接口使用的方法具體又可分為3種:
- 在程序中執行
- 使用命令行
- Jdbc/ODBC
這里只會介紹第一種方式。
Spark關於分布式數據集的抽象原本是RDD,Dataset是其升級版本。DataFrame是特殊的Dataset,它限定元素是按照命名的列來組織的,從這一點看相當於關系型數據庫中的表。DataFrame等價於Dataset[Row],而且DataFrame是本文內容的核心。
DataFrame支持豐富的數據源:
+++++++++++++++++++
| 結構數據文件 |
| |
| +++++++++++++ | ++++++++++++++++
| | parquet | | | Hive table |
| +++++++++++++ | ++++++++++++++++
| |
| +++++++++++++ | ++++++++++++++++
| | csv | | | 關系數據庫 |
| +++++++++++++ | ++++++++++++++++
| |
| +++++++++++++ | ++++++++++++++++
| | json | | | RDD |
| +++++++++++++ | ++++++++++++++++
| |
+++++++++++++++++++
這里的每一種數據源我們都會進行介紹。
本文主要介紹DataFrame和各數據源的IO操作,后面再寫一篇文章介紹基於DataFrame的使用操作。即:本文關注如何得到一個DataFrame,如何將一個DataFrame進行持久化;后面要寫的文章則關注如何使用DataFrame。
相關的開源項目demo-spark在github上。
數據源

這個圖概述了本文介紹的主要內容,它也可以作為后續的備忘和參考。
這個圖中包含兩種箭頭,寬箭頭表示數據的流向,細箭頭表示提供構造實例的方法。
比如DataFrame - DataFrameWriter - 存儲的粗箭頭,表示數據從內存通過DataFrameWriter流向存儲;SparkSession - DataFrameReader的細箭頭,表示可以從SparkSession對象創建DataFrameReader對象。
SparkSession
使用Spark SQL必須先構造SparkSession實例,時候之后需要調用其stop方法釋放資源。模板如下:
val spark = org.apache.spark.sql.SparkSession
.builder()
.appName("Spark SQL basic demo")
.master("local")
.getOrCreate()
// work with spark
spark.stop()
下文中出現的所有的spark,如無特殊說明,都是指按照上述代碼創建的SparkSession對象。
parquet
parquet文件是怎么樣的?
PAR1 "&, @ Alyssa Ben ,
0 red 88,
@ \Hexample.avro.User % name% %favorite_color% 5 favorite_numbers %array <&% nameDH& &P5 favorite_color<@&P &?% (favorite_numbersarray
ZZ&? ? avro.schema?{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_color","type":["string","null"]},{"name":"favorite_numbers","type":{"type":"array","items":"int"}}]} parquet-mr version 1.4.3 ? PAR1
它不是一個單純的文本文件,包含了一些無法渲染的特殊字符。
parquet是默認的格式。從一個parquet文件讀取數據的代碼如下:
val usersDF = spark.read.load("src/main/resources/users.parquet")
spark.read返回DataFrameReader對象,其load方法加載文件中的數據並返回DataFrame對象。這個可以參照上文的閉環圖理解。
我們可以調用Dataset#show()方法查看其內容:
usersDF.show()
輸出結果:
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
將一個DataFrame寫到parquet文件的代碼如下:
usersDF.write.save("output/parquet/")
DataFrame#write()方法返回DataFrameWriter對象實例,save方法將數據持久化為parquet格式的文件。save的參數是一個目錄,而且要求最底層的目錄是不存在的,下文類同。
另外一種寫的方式是:
peopleDF.write.parquet("output/parquet/")
這兩種方式的本質相同。
csv
csv是什么樣的?
csv又稱為逗號分隔符,即:使用逗號分隔一條數據中各字段的值。csv文件可以被excel解析,但是其本質只是一個文本文件。比如下面是一份csv文件的內容:
age,name
,Michael
30,Andy
19,Justin
第一行是表頭,但是它和下面的數據並沒有什么區別。所以在讀取的時候,必須告訴讀入器這個文件是有表頭的,它(第一行)才會被解析成表頭,否則就會被當成數據。
比如,解析表頭的讀:
spark.read.option("header", true).format("csv").load("output/csv/").show()
其中的option("header", true)就是告訴讀入器這個文件是有表頭的。
輸出為:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
不解析表頭的讀:
spark.read.format("csv").load("output/csv/").show()
輸出為:
+----+-------+
| _c0| _c1|
+----+-------+
| age| name|
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
spark自動構建了兩個字段:_c0和_c1,而把age和name當成了一行數據。
另外一種簡化的讀法:
spark.read.option("header", true).csv("output/csv/")
其原理和上文中介紹的其他格式的文件相同。
將DataFrame寫入到csv文件時也需要注意表頭,將表頭也寫入文件的方式:
peopleDF.write.option("header", true).format("csv").save("output/csv/")
不寫表頭,只寫數據的方式:
peopleDF.write.format("csv").save("output/csv/")
另外一種簡化的寫法是:
peopleDF.write.csv("output/csv/")
json
json文件是怎么樣的?
上文中說過,DataFrame相當於關系數據庫中的表,那么每一條數據相當於一行記錄。關系數據庫表又可以相當於一個類,每一行數據相當於具體的對象,所以DataFrame的每一條數據相當於一個對象。
DataFrame對要讀取的json有特殊的要求:即每一條數據作為一行,整體不能包裝成數組。比如:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
而一個標准的json應該是下面這樣:
[{
"name": "Michael"
}, {
"name": "Andy",
"age": 30
}, {
"name": "Justin",
"age": 19
}
]
使用下面的方式讀取json文件內容:
val peopleDF = spark.read.format("json").load(path)
這種讀取的方式和上文parquet的讀取方式一致,最終都是調用load方法。只是多了一段format("json"),這是因為parquet是默認的格式,而json不是,所以必須明確聲明。
還有一種簡化的方式,其本質還是上述的代碼:
val peopleDF = spark.read.json(path)
將一個DataFrame寫到json文件的方式:
peopleDF.write.format("json").save("output/json/")
同樣的道理,和保存為parquet格式文件相比,這里多了一段format("json")代碼。
另外一種簡略的寫法:
peopleDF.write.json("output/json/")
兩者的本質是相同的。
jdbc
spark可以直接通過jdbc讀取關系型數據庫中指定的表。有兩種讀取的方式,一種是將所有的參數都作為option一條條設置:
val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"
val jdbcDF = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", "vulcanus_ljl.data_dict")
.option("user", "vulcanus_ljl")
.option("password", "mypassword")
.load()
另一種是預先將參數封裝到Properties對象里:
val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"
val connectionProperties = new Properties()
connectionProperties.put("user", "vulcanus_ljl")
connectionProperties.put("password", "mypassword")
val jdbcDF2 = spark.read
.jdbc(url, "vulcanus_ljl.data_dict", connectionProperties)
spark還可以通過jdbc將DataFrame寫入到一張新表(表必須不存在),寫入的方式同樣分為兩種:
jdbcDF.write
.format("jdbc")
.option("url", url)
.option("dbtable", "vulcanus_ljl.data_dict_temp1")
.option("user", "vulcanus_ljl")
.option("password", "mypassword")
.option("createTableColumnTypes", "dict_name varchar(60), dict_type varchar(60)") // 沒有指定的字段使用默認的類型
.save()
和
jdbcDF2.write
.jdbc(url, "vulcanus_ljl.data_dict_temp2", connectionProperties)
其中,url和connectionProperties的內容同上文讀取時的設置。
寫入時可以通過createTableColumnTypes設置指定多個字段的類型,其他沒有指定的字段會使用默認的類型。
table
准備table
Spark SQL不需要依賴於一個已經存在的Hive,可以通過下面的代碼生成本地的倉庫:
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src")
CREATE TABLE...用來創建表,LOAD DATA用來將數據加載到表中。kv1.txt的文件內容如下:
238val_238
86val_86
311val_311
27val_27
165val_165
409val_409
255val_255
278val_278
98val_98
484val_484
讀取
使用下面的代碼讀取指定表,並打印前5條數據:
spark.read.table("src").show(5)
輸出:
+---+-------+
|key| value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
+---+-------+
寫入
使用下面的代碼,將DataFrame的數據寫入到一張新表:
tableDF.write.saveAsTable("src_bak")
如果要寫入一張已經存在的表,需要按照下面的方式:
tableDF.write.mode(SaveMode.Append).saveAsTable("src_bak")
連接一個已存在的Hive
將hive-site.xml放到項目的src/main/resources目錄下,spark會自動識別該配置文件,之后所有針對Hive table的讀寫都是根據配置作用於一個已存在的Hive的。
text
text文件是不包含格式信息的,將text讀取為DataFrame需要額外補充格式信息,具體又細分為兩種情況:一種是格式是提前約定好的,另一種是在運行時才能確定格式。
下面針對這兩種不同的情況分別介紹如何讀寫text的文件,text文件的內容如下:
Michael, 29
Andy, 30
Justin, 19
格式提前確定
讀入text文件:
case class Person(name: String, age: Long)
private def runInferSchemaExample(spark: SparkSession): Unit = {
// $example on:schema_inferring$
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.read.textFile("src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
peopleDF.show()
}
case class Person就是提前約定的text文件的格式,spark.read.textFile返回的是Dataset[String]類型,text的每一行作為一條數據。
import spark.implicits._是必要的,否則會報異常(我還沒有對這塊進行研究,無法給出詳細的解釋)。
寫DataFrame到text文件,必須先把DataFrame轉換成只有一列的數據集。比如對於上面的peopleDF,它的元素類型是Person,含有name和age兩列,直接寫就會拋出下面的異常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/E:/projects/shouzheng/demo-spark/output/text already exists.;
寫的方式如下:
peopleDF.map(person => person.getAs[String]("name") + "," + person.getAs[String]("age")).write.text("output/text")
格式在運行時確定
格式在運行時確定,是說我們不是在編碼階段預知數據的格式,所以無法預先定義好對應的case class。可能是因為我們需要解析很多的數據格式,每一種格式都定義case class不合適;可能是因為我們需要支持格式的動態擴展,能支持新的格式;可能是因為我們要處理的格式不穩定,可能發生變化...不管什么原因,其結果一致:我們只能通過更加動態的方式來解析數據的格式。
在這種情況下,我們依然需要獲取數據的格式。初步獲取的結果可能是常見的形式,比如字符串,然后解析並構造特定的類型StructType來表示數據的格式。
然后我們讀取text文件,將內容轉換為RDD[Row]類型,其中每一個元素的屬性和StructType類型中聲明的field是一一對應的。
准備好了代表schema的StructType和代表數據的RDD[Row],我們就可以創建DataFrame對象了:
import spark.implicits._
val peopleRDD = spark.sparkContext.textFile("src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
寫的方式同上,不再贅述。
總結
- 最核心的思想都在上面的那張閉環圖上。
- 大部分數據源都有兩種讀寫的方式:一種是指定format,一種是直接以格式名作為方法名。
