spark結構化數據處理:Spark SQL、DataFrame和Dataset


本文講解Spark的結構化數據處理,主要包括:Spark SQL、DataFrame、Dataset以及Spark SQL服務等相關內容。本文主要講解Spark 1.6.x的結構化數據處理相關東東,但因Spark發展迅速(本文的寫作時值Spark 1.6.2發布之際,並且Spark 2.0的預覽版本也已發布許久),因此請隨時關注Spark SQL官方文檔以了解最新信息。

文中使用Scala對Spark SQL進行講解,並且代碼大多都能在spark-shell中運行,關於這點請知曉。

概述

相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上的運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。

有多種方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但無論是哪種API或者是編程語言,它們都是基於同樣的執行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點,看你喜歡。

SQL

使用Spark SQL的一種方式就是通過SQL語句來執行SQL查詢。當在編程語言中使用SQL時,其返回結果將被封裝為一個DataFrame。

DataFrame

DataFrame是一個分布式集合,其中數據被組織為命名的列。它概念上等價於關系數據庫中的表,但底層做了更多的優化。DataFrame可以從很多數據源構建,比如:已經存在的RDD、結構化文件、外部數據庫、Hive表。

DataFrame的前身是SchemaRDD,從Spark 1.3.0開始SchemaRDD更名為DataFrame。與SchemaRDD的主要區別是:DataFrame不再直接繼承自RDD,而是自己實現了RDD的絕大多數功能。你仍舊可以在DataFrame上調用.rdd方法將其轉換為一個RDD。RDD可看作是分布式的對象的集合,Spark並不知道對象的詳細模式信息,DataFrame可看作是分布式的Row對象的集合,其提供了由列組成的詳細模式信息,使得Spark SQL可以進行某些形式的執行優化。DataFrame和普通的RDD的邏輯框架區別如下所示:

數據

DataFrame不僅比RDD有更加豐富的算子,更重要的是它可以進行執行計划優化(得益於Catalyst SQL解析器),另外Tungsten項目給DataFrame的執行效率帶來了很大提升(不過Tungsten優化也可能在后續開發中加入到RDD API中)。

但是在有些情況下RDD可以表達的邏輯用DataFrame無法表達,所以后續提出了Dataset API,Dataset結合了RDD和DataFrame的好處。

關於Tungsten優化可以參見:Project Tungsten:讓Spark將硬件性能壓榨到極限

Dataset

Dataset是Spark 1.6新添加的一個實驗性接口,其目的是想結合RDD的好處(強類型(這意味着可以在編譯時進行類型安全檢查)、可以使用強大的lambda函數)和Spark SQL的優化執行引擎的好處。可以從JVM對象構造出Dataset,然后使用類似於RDD的函數式轉換算子(map/flatMap/filter等)對其進行操作。

Dataset通過Encoder實現了自定義的序列化格式,使得某些操作可以在無需解序列化的情況下直接進行。另外Dataset還進行了包括Tungsten優化在內的很多性能方面的優化。

實際上Dataset是包含了DataFrame的功能的,這樣二者就出現了很大的冗余,故Spark 2.0將二者統一:保留Dataset API,把DataFrame表示為Dataset[Row],即Dataset的子集。

API進化

Spark在迅速的發展,從原始的RDD API,再到DataFrame API,再到Dataset的出現,速度可謂驚人,執行性能上也有了很大提升。

我們在使用API時,應該優先選擇DataFrame & Dataset,因為它的性能很好,而且以后的優化它都可以享受到,但是為了兼容早期版本的程序,RDD API也會一直保留着。后續Spark上層的庫將全部會用 DataFrame & Dataset,比如MLlib、Streaming、Graphx等。

關於這三種API的更詳細的討論以及選擇參見:Apache Spark: RDD, DataFrame or Dataset?

入門

起點之SQLContext

要想使用Spark SQL,首先你得創建一個SQLContext對象,在這之前你只需要創建一個SparkContext就行了,如下:

data

另外,你也可以使用HiveContext,它是SQLContext的超集,提供了一些額外的功能:使用HiveQL解析器、訪問Hive用戶定義函數、從Hive表讀取數據。並且,你不需要安裝Hive就可以使用HiveContext。不過將來版本的Spark SQL可能會逐步縮小SQLContext和HiveContext之間的差距。

對於SQLContext,目前只有一個簡單的SQL語法解析器sql,而對於HiveContext,則可以使用hiveql和sql兩個解析器,默認是hiveql,我們可以通過如下語句來修改默認解析器:

Dataset

不過就目前來說,HiveQL解析器更加完善,因此推薦使用HiveQL。

創建並使用DataFrame

通過SQLContext,應用程序可以從已經存在的RDD、結構化文件、外部數據庫以及Hive表中創建出DataFrame。如下代碼從JSON文件(該文件可以從Spark發行包中找到)創建出一個DataFrame:

創建並使用DataFrame

DataFrame提供了一個領域特定語言(DSL)以方便操作結構化數據。下面是一些使用示例,更多更全的DataFrame操作參見Spark API文檔中的org.apache.spark.sql.DataFrame:

提供

另外,org.apache.spark.sql.functions單例對象還包含了一些操作DataFrame的函數,主要有這幾個方面:聚集操作、集合操作、字符串處理、排序、日期計算、通用數學函數、校驗碼操作、窗口操作、用戶定義函數支持等等。

在程序中執行SQL查詢

我們可以通過在程序中使用SQLContext.sql()來執行SQL查詢,結果將作為一個DataFrame返回:

執行查詢

我們還可以將DataFrame注冊為一個臨時表,然后就可以在其上執行SQL語句進行查詢了,臨時表的生命周期在與之關聯的SQLContext結束生命之后結束。示例如下:

臨時表

創建並使用Dataset

Dataset跟RDD相似,但是Dataset並沒有使用Java序列化庫和Kryo序列化庫,而是使用特定Encoder來序列化對象。encoder通常由代碼自動產生(在Scala中是通過隱式轉換產生),並且其序列化格式直接允許Spark執行很多操作,比如:filtering、sorting、hashing,而不需要解序列化。

創建使用

與RDD互操作

Spark SQL支持兩種不同的方法將RDD轉換為DataFrame。第一種方法使用反射去推斷包含特定類型對象的RDD的模式(schema),該方法可以使你的代碼更加精簡,不過要求在你寫Spark程序時已經知道模式信息(比如RDD中的對象是自己定義的case class類型)。第二種方法通過一個編程接口,此時你需要構造一個模式並將其應用到一個已存在的RDD以將其轉換為DataFrame,該方法適用於在運行時之前還不知道列以及列的類型的情況。

用反射推斷模式

Spark SQL的Scala接口支持將包含case class的RDD自動轉換為DataFrame。case class定義了表的模式,case class的參數名被反射讀取並成為表的列名。case class也可以嵌套或者包含復雜類型(如序列或者數組)。示例如下:

推斷模式

手動編程指定模式

當case class不能提前定義時(比如記錄的結構被編碼為字符串,或者當文本數據集被解析時不同用戶需要映射不同的字段),可以通過下面三步來將RDD轉換為DataFrame:

從原始RDD創建得到一個包含Row對象的RDD。

創建一個與第1步中Row的結構相匹配的StructType,以表示模式信息。

通過SQLContext.createDataFrame()將模式信息應用到第1步創建的RDD上。

指定模式

數據源

DataFrame可以當作標准RDD進行操作,也可以注冊為一個臨時表。將DataFrame注冊為一個臨時表,允許你在其上執行SQL查詢。DataFrame接口可以處理多種數據源,Spark SQL也內建支持了若干種極有用的數據源格式(包括json、parquet和jdbc,其中parquet是默認格式)。此外,當你使用SQL查詢這些數據源中的數據並且只用到了一部分字段時,Spark SQL可以智能地只掃描這些用到的字段。

通用加載/保存函數

DataFrameReader和DataFrameWriter中包好一些通用的加載和保存函數,所有這些操作都將parquet格式作為默認數據格式。示例如下:

保存

手動指定選項

你也可以手動指定數據源和其他任何想要傳遞給數據源的選項。指定數據源通常需要使用數據源全名(如org.apache.spark.sql.parquet),但對於內建數據源,你也可以使用它們的短名(json、parquet和jdbc)。並且不同的數據源類型之間都可以相互轉換。示例如下:

指定選項

用SQL直接查詢文件

也可以不用read API加載一個文件到DataFrame然后查詢它,而是直接使用SQL語句在文件上查詢:

val df = sqlContext.sql(“SELECT * FROM parquet.`examples/src/main/resources/users.parquet`”)

保存到持久表

可以使用DataFrameWriter.saveAsTable()將一個DataFrame保存到一個持久化表,根據不同的設置,表可能被保存為Hive兼容格式,也可能被保存為Spark SQL特定格式,關於這一點請參見API文檔。

我們可以通過SQLContext.table()或DataFrameReader.table()來加載一個表並返回一個DataFrame。

Parquet文件

Parquet格式是被很多其他的數據處理系統所支持的列式數據存儲格式。它可以高效地存儲具有嵌套字段的記錄,並支持Spark SQL的全部數據類型。Spark SQL支持在讀寫Parquet文件時自動地保存原始數據的模式信息。出於兼容性考慮,在寫Parquet文件時,所有列將自動轉換為nullable。

加載數據

文件查詢

下面是SQL示例:

實例

分區發現

在很多系統中(如Hive),表分區是一個通用的優化方法。在一個分區的表中,數據通常存儲在不同的目錄中,列名和列值通常被編碼在分區目錄名中以區分不同的分區。Parquet數據源能夠自動地發現和推斷分區信息。 如下是人口分區表目錄結構,其中gender和country是分區列:

分區

當使用SQLContext.read.parquet 或 SQLContext.read.load讀取path/to/table時,Spark SQL能夠自動從路徑中提取分區信息,返回的DataFrame的模式信息如下:

模式信息

上述模式中,分區列的數據類型被自動推斷。目前,支持的數據類型有數字類型和字符串類型。如果你不想數據類型被自動推斷,可以配置spark.sql.sources.partitionColumnTypeInference.enabled,默認為true,如果設置為false,將禁用自動類型推斷並默認使用字符串類型。

從Spark 1.6.0開始,分區發現默認只發現給定路徑下的分區。如果用戶傳遞path/to/table/gender=male作為路徑讀取數據,gender將不被作為一個分區列。你可以在數據源選項中設置basePath來指定分區發現應該開始的基路徑。例如,還是將path/to/table/gender=male作為數據路徑,但同時設置basePath為path/to/table/,gender將被作為分區列。

模式合並

就像ProtocolBuffer、Avro和Thrift,Parquet也支持模式演化(schema evolution)。這就意味着你可以向一個簡單的模式逐步添加列從而構建一個復雜的模式。這種方式可能導致模式信息分散在不同的Parquet文件中,Parquet數據源能夠自動檢測到這種情況並且合並所有這些文件中的模式信息。

但是由於模式合並是相對昂貴的操作,並且絕大多數情況下不是必須的,因此從Spark 1.5.0開始缺省關閉模式合並。開啟方式:在讀取Parquet文件時,設置數據源選項mergeSchema為true,或者設置全局的SQL選項spark.sql.parquet.mergeSchema為true。示例如下:

模式

JSON數據

SQLContext.read.json()可以將RDD[String]或者JSON文件加載並轉換為一個DataFrame。

有一點需要注意的是:這里用的JSON文件並不是隨意的典型的JSON文件,每一行必須是一個有效的JSON對象,如果一個對象跨越多行將導致失敗。對於RDD[String]也是一樣。

JSON數據

下面是SQL示例:

數據實例

數據庫

Spark SQL也可以使用JDBC從其他數據庫中讀取數據,你應該優先使用它而不是JdbcRDD,因為它將返回的數據作為一個DataFrame,所以很方便操作。但請注意:這和Spark SQL JDBC服務是不同的,Spark SQL JDBC服務允許其他應用通過JDBC連接到Spark SQL進行查詢。

要在Spark SQL中連接到指定數據庫,首先需要通過環境變量SPARK_CLASSPATH設置你的數據庫的JDBC驅動的路徑。例如在spark-shell中連接MySQL數據庫,你可以使用如下命令:

數據庫命令

可以通過SQLContext.read.format(“jdbc”).options(…).load()或SQLContext.read.jdbc(…)從數據庫中加載數據到DataFrame。如下示例,我們從MySQL數據庫中加載數據:

加載數據

下面是SQL示例:

加載案例

JDBC驅動需要在所有節點的相同路徑下都存在,因為是分布式處理嘛,就像Spark核心一樣。

有些數據庫需要使用大寫來引用相應的名字,好像Oracle中就需要使用大寫的表名。

分布式SQL引擎

Spark SQL也可以扮演一個分布式SQL引擎的角色,你可以使用JDBC/ODBC或者Spark SQL命令行接口連接到它,並直接執行交互式SQL查詢。

運行Thrift JDBC/ODBC Server

Spark SQL中實現的Thrift JDBC/ODBC Server跟Hive中的HiveServer2相一致。可以使用如下命令開啟JDBC/ODBC服務,缺省情況下的服務監聽地址為localhost:10000:

分布式

這個腳本不僅可以接受spark-submit命令可以接受的所有選項,還支持–hiveconf 屬性=值選項來配置Hive屬性。你可以執行./sbin/start-thriftserver.sh –help來查看完整的選項列表。

你可以使用beeline連接到上面已經開啟的Spark SQL引擎。命令如下:

AQL

連接到beeline需要輸入用戶名和密碼。在非安全模式下,簡單地輸入你自己的用戶名就可以了,密碼可以為空。對於安全模式,參見beeline文檔。

運行Spark SQL CLI

Spark SQL CLI的引入使得在Spark SQL中可方便地通過Hive metastore對Hive進行查詢。目前為止還不能使用Spark SQL CLI與Thrift JDBC/ODBC Server進行交互。這個腳本主要對本地開發比較有用,在共享的集群上,你應該讓各用戶連接到Thrift JDBC/ODBC Server。

使用Spark SQL CLI前需要注意:

將hive-site.xml配置文件拷貝到$SPARK_HOME/conf目錄下。

需要在./conf/spark-env.sh中的SPARK_CLASSPATH添加jdbc驅動的jar包。

啟動Spark SQL CLI的命令如下:

SQL命令

在啟動spark-sql時,如果不指定master,則以local的方式運行,master既可以指定standalone的地址,也可以指定yarn。當設定master為yarn時(spark-sql –master yarn)時,可以通過http://master:8088 頁面監控到整個job的執行過程。如果在./conf/spark-defaults.conf中配置了spark master的話,那么在啟動spark-sql時就不需要再指定master了。spark-sql啟動之后就可以在其中敲擊SQL語句了。

關於Spark SQL CLI的可用命令和參數,請敲擊./bin/spark-sql –help以查看。

性能調優

緩存數據在內存中

通過調用sqlContext.cacheTable(“tableName”)或dataFrame.cache(),Spark SQL可以將表以列式存儲緩存在內存中。這樣的話,Spark SQL就可以只掃描那些使用到的列,並且將自動壓縮數據以減少內存占用和GC開銷。你可以調用sqlContext.uncacheTable(“tableName”)將緩存的表從內存中移除。另外,你也可以在SQL/HiveQL中使用CACHE tablename和UNCACHE tablename來緩存表和移除已緩存的表。

和緩存RDD時的動機一樣,如果想在同樣的數據上多次運行任務或查詢,就應該把這些數據表緩存起來。

將數據緩存在內存中,同樣支持一些選項,參見Spark SQL官方文檔性能調優部分。

其他調優相關參數

還有其他一些參數可以用於優化查詢性能,參見Spark SQL官方文檔性能調優部分。

 

End.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM