7. 數據源
Spark-SQL 支持通過Dataframe接口對各種數據源進行操作
各種數據源的加載&保存
數據轉換(relational transformations)
注冊臨時視圖(temporary view),來允許SQL的形式直接對臨時視圖進行操作
7.1 數據源加載
Spark-SQL的默認數據源為parquet(spark.sql.sources.default設置),一些數據源加載的例子如下:
/** * 加載parquet數據源 */ spark.read.load("D:\\data\\users.parquet").show() /** * 加載非parquet數據源,需要手動設置選項 * Spark-SQLd對內置數據源,可以使用短名稱.對自定義的數據源需要使用全名稱(比如:org.apache.spark.sql.parquet) * Spark-SQL內置數據源:json, parquet, jdbc, orc, libsvm, csv, text */ spark.read.format("csv").load("D:\\data\\people.csv").show() /** * 以SQL的形式讀取文件 */ spark.sql("select * from json.`D:\\data\\sku.json`").show()
7.2 數據源保存
7.2.1 保存模式
數據源的保存提供了以下保存的模式
Scala/Java | Any Language | 描述 |
SaveMode.ErrorIfExists(默認) | error/errorifexists | 如果保存的數據已存在,將拋出錯誤 |
SaveMode.Append | append | 如果保存的數據已存在,數據將以追加的形式寫入 |
SaveMode.Overwrite | overwrite | 如果保存的數據已存在,數據將以覆蓋的形式寫入 |
SaveMode.Ignore | ignore | 如果保存的數據已存在,數據將不會寫入(也不會拋出錯誤,即原數據不變,新數據忽略) |
數據源的保存是 非原子性 的,這一點務必要注意.比如以Overwrite形式寫入時,數據將先刪除后寫入(覆蓋),這里有數據丟失的可能性的
7.2.2 存儲源
Spark-SQL 可以將數據保存在兩種源上: 文件(file-based) 和 持久化表(persistent tables)
i).文件(file-based)
數據保存的格式可以變化,比如將一個csv保存為一個txt的例子
spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath")
ii).持久化表(persistent tables)
將數據保存到Hive的表中.
元數據
數據保存到Hive的表中,將同時自動保存schema到Hive的MetaStore里
內/外表
數據保存時,根據是否指定path來區分內外表.
當手動設置為其它路徑時成為外部表(表刪除只刪除元數據不刪除表數據)
不設置path則成為內部表(表刪除將同時刪除元數據和表數據)
注意: 從Spark2.0之后, CREATE TABLE 但沒有指定path依然會成為一個外部表.即唯一標准是是否指定path
表分區
如果表被設置為外部表,默認不會收集分區信息,需要手動同步(msck / add partition)
7.3.內置數據源
7.3.1 Parquet
Parquet是一種列式存儲格式,無法直接閱讀.但有非常好的壓縮消耗和壓縮比.
Parquet是Spark默認和推薦使用的數據格式.在很多方面,Spark都對Parquet有最大支持
在Spark中,出於兼容性的考慮,所有的Columns都將自動轉換為可空類型
7.3.1.1 一個Parquet讀寫的例子
spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath")
7.3.1.2 Parquet的分區發現
在Hive之類中,分區是以目錄的形式存在.分區鍵本身是目錄的一部分.但此時,分區信息是不會自動的被Hive發現.
而在Parquet中,Spark可以自動的發現分區和推斷數據類型.推斷的依據就是目錄
比如:一個存放所有訂單信息的目錄,以類似這樣的格式 order/time=xxxx-MM-dd}/area=xx/xxxx.parquet
將路徑(/order)傳入,讀取將自動增加兩列time和area作為分區列,並自動在實際使用中應用分區信息
注意:
對Parquet的分區發現字段現僅支持數字型和字符串型.
spark.sql.sources.partitionColumnTypeInference.enabled(默認為true)設置,可以關閉分區發現的數據類型推斷
7.3.1.3 Parquet的元數據合並(schema merging)
實際工作中,Parquet數據文件很可能是由一個開始比較簡單的schema逐漸變得復雜(一開始的業務比較簡單,但隨着業務的推進會加入越來越多的字段)
因為Parquet的schema是自存儲的,所以需要一種途徑來保持對歷史數據schema的兼容性,這就是模式演進(schema evolution),而實現的手段就是模式合並
模式合並是一個昂貴操作,而在大多數情況下都是不會使用到它,所以模式合並默認是關閉的
有以下兩種途徑可以打開模式合並
i).臨時=>在Option中設置mergeSchema=true
spark.read.option("mergeSchema", "true")
ii).永久=> 配置文件配置 spark.sql.parquet.mergeSchema 設置為 true
7.3.1.4 Parquet 與Hive MetaStore
在Spark讀取和寫入Hive的Parquet表時,Spark將使用自己的Parquet Support,而不是Hive SerDe ( spark.sql.hive.convertMetastoreParquet 默認開啟
)
Spark-Parquet-Schema 與 Hive-MetaStore 是有一些區別的,這些區別體現在:
Hive-MetaStore不區分大小寫,而Spark-Parquet-Schema是區分大小寫的.
Hive-MetaStore認為所有列都是可空的,而Spark-Parquet-Schema認為列是否可空必須指定
所以在Spark-Parquet-Schema 與 Hive-MetaStore 會進行一些自協調(reconciled schema),規則如下:
只出現在Spark-Parquet-Schema中的任何字段會被自動刪除
只出現在Hive-MetaStore中的任何字段以可空字段(nullable field)形式增加
7.3.1.5 Parquet的元數據刷新(Metadata Refreshing)
Parquet的元數據將會被自動緩存以獲得更好的性能.
但需要注意的是:Hive metastore Parquet table conversion,這些Hive表的MetaData也會被緩存.
如果這些表在Hive或其它工具層面被改變,Spark無法感知這些改變,必須手工刷新.刷新方式如下:
spark.catalog.refreshTable("my_table")
7.3.1.6 Parquet的常用配置
參數名 | 默認值 | 描述 |
spark.sql.parquet.binaryAsString | false | 一些其它的基於Parquet系統(比如Hive,Impala,舊版Spark-SQL),在寫出Parquet-schema時不區分binary data (二進制數據)和 strings (字符串) 啟用這個屬性,將保持將二進制數據解釋為字符串的兼容性 |
spark.sql.parquet.int96AsTimestamp | true | 一些其它的基於Parquet系統(比如Hive,Impala),會將Timestamp寫為int96.該屬性告知Spark-SQL將int96解析為Timestamp的兼容性 |
spark.sql.parquet.cacheMetadata | true | 指示是否打開parquet的元數據緩存(這可以加快查詢靜態數據的速度) |
spark.sql.parquet.compression.codec | snappy | 指示寫出parquet文件的壓縮類型(默認snappy).可選項為:uncompressed, snappy, gzip, lzo |
spark.sql.parquet.filterPushdown | true | 設置為 true 時啟用 (過濾謂詞下推,嘗試調整算子執行順序,將寫在后面的過濾謂詞提前處理以減少查詢數據量,提高性能) |
spark.sql.hive.convertMetastoreParquet | true | 面對Hive時,是否啟用使用Spark-SQL內置的parquet,設為否則放棄使用內置parquet轉而使用Hive serDe |
spark.sql.parquet.mergeSchema | false | 是否全局啟用parquet的元數據合並,設為否則從summary file或random file中隨機挑選 |
spark.sql.optimizer.metadataOnly | true | 當設為true時,將使用metadata信息來構建分區列而不是走表掃描.(只是在查詢的所有列都是分區列時才有意義,並且此時依然有各種聚合能力) |
7.3.1 ORC
ORC格式是Spark-2.3之后才支持的數據源格式.它的可配置信息如下:
參數名 | 默認值 | 描述 |
spark.sql.orc.impl | hive | ORC的實現名稱.可選值hive(使用hive-1.2.1的ORC庫),native(或者hive.native,將使用Apache ORC 1.4.1)
|
spark.sql.orc.enableVectorizedReader | true | 是否在本機使用向量化ORC解碼.如果為false,會在本機構造一個向量化ORC閱讀器,對於上面設置為hive,這個屬性將被忽略 |
7.3.2 JSON
Spark-SQL可以自動推斷Json的schema.(Json的schema依然是自存儲的)
Spark-SQL加載將以DataFrame[Row]的形式,並且可以非常簡單的轉為一個Dataset[T](.as[T])
一個簡單的例子如下:
//讀取Json數據源,如果某些行缺age屬性的,補0
spark.read.json("D:\\data\\people.json").na.fill(0,Seq("age"))
7.3.3 Hive
7.3.3.1 Hive支持
Spark-SQL還支持讀取和寫入存儲在Hive中的數據.Spark-SQL使用Hive,但需要做一些補充配置
i).Hive的庫必須在classpath中被找到.因為Hive的庫有大量的依賴,而這些依賴不一定是被Spark完全打包的,所以在運行時,必須要能找到這些庫包
注意:這種依賴與找到是針對每個executor而言的,因為executor才是真正的執行者.
ii).需要在Spark.Conf中拷入 hive-site.xml
, core-site.xml
(用於安全配置)和 hdfs-site.xml
(用於 HDFS 配置)文件
iii).需要在SparkSession中啟用對Hive的支持:spark.enableHiveSupport()
在實際運行中,不一定需要部署一個完整的Hive系統,比如當hive-site.xml不存在時,會在當前目錄創建一個metastore_db用以存儲元數據(不推薦如此)
7.3.3.2 Hive的存儲格式
創建一個Hive表,需要定義讀取/寫入文件系統的格式,以及數據的行斷定義,列斷定義等.Spark-Sql讀取Hive表時,默認以文本格式讀取.
一些Hive存儲格式相關格式如下:
屬性名 | 描述 |
fileFormat | fileFormat是一種存儲格式規范的包,包括 "serde","input format" 和 "output format"。 目前支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro' |
inputFormat, outputFormat | 這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定為字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 這兩個選項必須成對出現,如果已經指定了 "fileFormat" 選項,則無法指定它們 |
serde | 此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時,如果給定的 `fileFormat` 已經包含 serde 的信息,那么不要指定這個選項。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可以使用這3個文件格式的這個選項 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim | 這些選項只能與 "textfile" 文件格式一起使用。它們定義如何將分隔的文件讀入行 |
7.3.3.3 對Hive不支持的地方
Spark-SQL,相當於內置了一個Hive引擎,它具有Hive絕大部分的功能,但確實不是完全支持Hive.Spark-SQL不支持的如下:
Tables with buckets
UNION
和 Unique join
Hive的列靜態數據統計收集功能
File format for CLI Spark-SQL只支持 TextOutputFormat
對一些UDF函數,Spark-SQL結果也與原生Hive不同
SQRT(n)
If n < 0, Hive returns null, Spark SQL returns NaN
ACOS(n)
If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN
ASIN(n)
If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN
詳情請見官網描述
7.3.3 JDBC
7.3.3.1 簡述
Spark-SQL 同樣支持以JDBC的形式從其它的關系型數據庫讀取數據.(此方式優於JdbcRDD,因為DF更容易與處理與其它數據源的交互,比如直接映射為臨時視圖等等)
JDBC-驅動
JDBC依賴目標數據庫驅動包.需要注意的是,這個驅動必須同時對driver端和executor端所有執行程序的類默認加載器可見(DriverManager會忽略不可見的驅動導致執行找不到驅動)
例如,要從 Spark Shell 連接到 postgres 必須指定驅動包位置
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
一個比較方便的辦法是修改所有工作節點 compute_classpath.sh 包含進驅動包
7.3.3.2 JDBC常用配置
可以在數據源選項中指定 JDBC 連接屬性。用戶
和 密碼
通常作為登錄數據源的連接屬性提供。 除了連接屬性外,Spark 還支持以下不區分大小寫的選項
屬性 | 描述 |
url | 要連接的JDBC URL。 源特定的連接屬性可以在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 應該讀取的 JDBC 表。請注意,可以使用在SQL查詢的 FROM 子句中有效的任何內容。 例如,您可以使用括號中的子查詢代替完整表 |
driver | 用於連接到此 URL 的 JDBC driver 程序的類名 |
partitionColumn, lowerBound, upperBound | 這三個屬性必須被一起設置.partitionColumn 必須是數字列.lowerBound 和 upperBound 僅用於決定分區的大小而不是用於過濾表中的行.因此,表中的所有行將被分區並返回 |
numPartitions | 在表讀寫中可以用於並行度的最大分區數。這也確定並發JDBC連接的最大數量。 如果要寫入的分區數超過此限制,則在寫入之前通過調用 coalesce(numPartitions) 將其減少到此限制 |
fetchsize | JDBC 抓取的大小,用於確定每次數據往返傳遞的行數。 這有利於提升 JDBC driver 的性能,它們的默認值較小(例如: Oracle 是 10 行)。 該選項僅適用於讀取操作 |
batchsize | JDBC 批處理的大小,用於確定每次數據往返傳遞的行數。 這有利於提升 JDBC driver 的性能。 該選項僅適用於寫操作。默認值為 1000 |
isolationLevel | 事務隔離級別,適用於當前連接。 它可以是 此選項僅適用於寫操作。請參考 |
sessionInitStatement | 在每個數據庫會話打開到遠程DB並開始讀取數據之后,該選項將執行一個自定義SQL語句(或PL/SQL塊)。使用它來實現會話初始化代碼。 示例:選項(“sessionInitStatement”、“”開始執行即時的“alter session set”“_serial_direct_read”=true”;結束;”“”) |
truncate | 這是一個與 JDBC 相關的選項.啟用 SaveMode.Overwrite 時,此選項會導致 Spark 截斷現有表,而不是刪除並重新創建。它默認為 false。 此選項僅適用於寫操作 |
createTableOptions | 這是一個與JDBC相關的選項。 如果指定,此選項允許在創建表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此選項僅適用於寫操作 |
createTableColumnTypes | 這是一個與JDBC相關的選項.如果指定,此選項允許在創建表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.).此選項僅適用於寫操作 |
customSchema | 用於從連接器讀取JDBC數據的自定義模式。例如,“id DECIMAL(38,0), name STRING”。還可以指定部分字段,其他字段使用默認類型映射。 例如,“id DECIMAL(38,0)”。列名應該與JDBC表對應的列名相同。用戶可以指定Spark SQL的相應數據類型,而不是使用默認值。此選項僅適用於讀取 |
7.3.3.2 JDBC-Demo
/** * 連接RDBMS(SQLServer2008)Demo * 注:必須導入SQLServer驅動包(sqljdbc42.jar),(代碼中不能體現,但實際已在項目中添加過了) */ val df = spark.read.format("jdbc") .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("url", "jdbc:sqlserver://127.0.0.1\\Sql2008:1433;DatabaseName=AdapterService") .option("user", "sa") .option("password", "12abAB") .option("dbtable", "dbo.tm_flow") .load() df.show(10);
7.3.3.3 JDBC-性能調優
內存緩存
Spark-SQL 支持對JDBC結果緩存到內存以提升查詢效率
spark.catalog.cacheTable("tableName") 或 dataFrame.cache() 將數據結果以列式存儲的形式存放至內存
spark.catalog.uncacheTable("tableName") 刪除緩存
廣播
broadcast(spark.table("xxx"))