[Spark]-結構化數據查詢之數據源篇


7. 數據源

  Spark-SQL 支持通過Dataframe接口對各種數據源進行操作

    各種數據源的加載&保存

    數據轉換(relational transformations)

    注冊臨時視圖(temporary view),來允許SQL的形式直接對臨時視圖進行操作

  7.1  數據源加載

    Spark-SQL的默認數據源為parquet(spark.sql.sources.default設置),一些數據源加載的例子如下:    

 /**
        * 加載parquet數據源
        */
        spark.read.load("D:\\data\\users.parquet").show()
        /**
        * 加載非parquet數據源,需要手動設置選項
        *   Spark-SQLd對內置數據源,可以使用短名稱.對自定義的數據源需要使用全名稱(比如:org.apache.spark.sql.parquet)
        *   Spark-SQL內置數據源:json, parquet, jdbc, orc, libsvm, csv, text
        */
        spark.read.format("csv").load("D:\\data\\people.csv").show()
        /**
        * 以SQL的形式讀取文件
        */
        spark.sql("select * from json.`D:\\data\\sku.json`").show()

  7.2 數據源保存

    7.2.1 保存模式

      數據源的保存提供了以下保存的模式

Scala/Java Any Language 描述
SaveMode.ErrorIfExists(默認) error/errorifexists 如果保存的數據已存在,將拋出錯誤
SaveMode.Append append 如果保存的數據已存在,數據將以追加的形式寫入
SaveMode.Overwrite overwrite 如果保存的數據已存在,數據將以覆蓋的形式寫入
SaveMode.Ignore ignore 如果保存的數據已存在,數據將不會寫入(也不會拋出錯誤,即原數據不變,新數據忽略)

      數據源的保存是 非原子性 的,這一點務必要注意.比如以Overwrite形式寫入時,數據將先刪除后寫入(覆蓋),這里有數據丟失的可能性的

       7.2.2 存儲源

      Spark-SQL 可以將數據保存在兩種源上: 文件(file-based) 和 持久化表(persistent tables) 

      i).文件(file-based)

        數據保存的格式可以變化,比如將一個csv保存為一個txt的例子

        spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath") 

       ii).持久化表(persistent tables)

        將數據保存到Hive的表中.

        元數據

          數據保存到Hive的表中,將同時自動保存schema到Hive的MetaStore里

        內/外表

          數據保存時,根據是否指定path來區分內外表.

            當手動設置為其它路徑時成為外部表(表刪除只刪除元數據不刪除表數據)

            不設置path則成為內部表(表刪除將同時刪除元數據和表數據)

            注意: 從Spark2.0之后, CREATE TABLE 但沒有指定path依然會成為一個外部表.即唯一標准是是否指定path

        表分區

          如果表被設置為外部表,默認不會收集分區信息,需要手動同步(msck / add partition)

  7.3.內置數據源

    7.3.1 Parquet

        Parquet是一種列式存儲格式,無法直接閱讀.但有非常好的壓縮消耗和壓縮比.

        Parquet是Spark默認和推薦使用的數據格式.在很多方面,Spark都對Parquet有最大支持

        在Spark中,出於兼容性的考慮,所有的Columns都將自動轉換為可空類型

        7.3.1.1 一個Parquet讀寫的例子

        spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath") 

           7.3.1.2 Parquet的分區發現

        在Hive之類中,分區是以目錄的形式存在.分區鍵本身是目錄的一部分.但此時,分區信息是不會自動的被Hive發現.

        而在Parquet中,Spark可以自動的發現分區和推斷數據類型.推斷的依據就是目錄

        比如:一個存放所有訂單信息的目錄,以類似這樣的格式 order/time=xxxx-MM-dd}/area=xx/xxxx.parquet

           將路徑(/order)傳入,讀取將自動增加兩列time和area作為分區列,並自動在實際使用中應用分區信息

        注意:

          對Parquet的分區發現字段現僅支持數字型和字符串型.

        spark.sql.sources.partitionColumnTypeInference.enabled(默認為true)設置,可以關閉分區發現的數據類型推斷

        7.3.1.3 Parquet的元數據合並(schema merging)

         實際工作中,Parquet數據文件很可能是由一個開始比較簡單的schema逐漸變得復雜(一開始的業務比較簡單,但隨着業務的推進會加入越來越多的字段)

         因為Parquet的schema是自存儲的,所以需要一種途徑來保持對歷史數據schema的兼容性,這就是模式演進(schema evolution),而實現的手段就是模式合並

         模式合並是一個昂貴操作,而在大多數情況下都是不會使用到它,所以模式合並默認是關閉的

         有以下兩種途徑可以打開模式合並

          i).臨時=>在Option中設置mergeSchema=true

            spark.read.option("mergeSchema""true")

            ii).永久=> 配置文件配置 spark.sql.parquet.mergeSchema 設置為 true

      7.3.1.4 Parquet 與Hive MetaStore

        在Spark讀取和寫入Hive的Parquet表時,Spark將使用自己的Parquet Support,而不是Hive SerDe ( spark.sql.hive.convertMetastoreParquet 默認開啟)

         Spark-Parquet-Schema 與 Hive-MetaStore 是有一些區別的,這些區別體現在:

          Hive-MetaStore不區分大小寫,而Spark-Parquet-Schema是區分大小寫的.

          Hive-MetaStore認為所有列都是可空的,而Spark-Parquet-Schema認為列是否可空必須指定

        所以在Spark-Parquet-Schema 與 Hive-MetaStore 會進行一些自協調(reconciled schema),規則如下:

          只出現在Spark-Parquet-Schema中的任何字段會被自動刪除

          只出現在Hive-MetaStore中的任何字段以可空字段(nullable field)形式增加

      7.3.1.5 Parquet的元數據刷新(Metadata Refreshing)

         Parquet的元數據將會被自動緩存以獲得更好的性能.

         但需要注意的是:Hive metastore Parquet table conversion,這些Hive表的MetaData也會被緩存.

         如果這些表在Hive或其它工具層面被改變,Spark無法感知這些改變,必須手工刷新.刷新方式如下:

          spark.catalog.refreshTable("my_table")

      7.3.1.6 Parquet的常用配置

          

參數名 默認值 描述
spark.sql.parquet.binaryAsString false

一些其它的基於Parquet系統(比如Hive,Impala,舊版Spark-SQL),在寫出Parquet-schema時不區分binary data (二進制數據)和 strings (字符串)

啟用這個屬性,將保持將二進制數據解釋為字符串的兼容性

spark.sql.parquet.int96AsTimestamp true 一些其它的基於Parquet系統(比如Hive,Impala),會將Timestamp寫為int96.該屬性告知Spark-SQL將int96解析為Timestamp的兼容性
spark.sql.parquet.cacheMetadata true 指示是否打開parquet的元數據緩存(這可以加快查詢靜態數據的速度)
spark.sql.parquet.compression.codec snappy 指示寫出parquet文件的壓縮類型(默認snappy).可選項為:uncompressed, snappy, gzip, lzo
spark.sql.parquet.filterPushdown true 設置為 true 時啟用 (過濾謂詞下推,嘗試調整算子執行順序,將寫在后面的過濾謂詞提前處理以減少查詢數據量,提高性能)
spark.sql.hive.convertMetastoreParquet true 面對Hive時,是否啟用使用Spark-SQL內置的parquet,設為否則放棄使用內置parquet轉而使用Hive serDe
spark.sql.parquet.mergeSchema false 是否全局啟用parquet的元數據合並,設為否則從summary file或random file中隨機挑選
spark.sql.optimizer.metadataOnly true 當設為true時,將使用metadata信息來構建分區列而不是走表掃描.(只是在查詢的所有列都是分區列時才有意義,並且此時依然有各種聚合能力)

    7.3.1 ORC

      ORC格式是Spark-2.3之后才支持的數據源格式.它的可配置信息如下:

參數名 默認值 描述
spark.sql.orc.impl hive ORC的實現名稱.可選值hive(使用hive-1.2.1的ORC庫),native(或者hive.native,將使用Apache ORC 1.4.1)
spark.sql.orc.enableVectorizedReader true 是否在本機使用向量化ORC解碼.如果為false,會在本機構造一個向量化ORC閱讀器,對於上面設置為hive,這個屬性將被忽略

    7.3.2 JSON

      Spark-SQL可以自動推斷Json的schema.(Json的schema依然是自存儲的)

      Spark-SQL加載將以DataFrame[Row]的形式,並且可以非常簡單的轉為一個Dataset[T](.as[T])

      一個簡單的例子如下:

        //讀取Json數據源,如果某些行缺age屬性的,補0

        spark.read.json("D:\\data\\people.json").na.fill(0,Seq("age"))

     7.3.3 Hive

      7.3.3.1 Hive支持

        Spark-SQL還支持讀取和寫入存儲在Hive中的數據.Spark-SQL使用Hive,但需要做一些補充配置

          i).Hive的庫必須在classpath中被找到.因為Hive的庫有大量的依賴,而這些依賴不一定是被Spark完全打包的,所以在運行時,必須要能找到這些庫包

           注意:這種依賴與找到是針對每個executor而言的,因為executor才是真正的執行者.

            ii).需要在Spark.Conf中拷入 hive-site.xmlcore-site.xml(用於安全配置)和 hdfs-site.xml (用於 HDFS 配置)文件

           iii).需要在SparkSession中啟用對Hive的支持:spark.enableHiveSupport()

             在實際運行中,不一定需要部署一個完整的Hive系統,比如當hive-site.xml不存在時,會在當前目錄創建一個metastore_db用以存儲元數據(不推薦如此)

      7.3.3.2 Hive的存儲格式

        創建一個Hive表,需要定義讀取/寫入文件系統的格式,以及數據的行斷定義,列斷定義等.Spark-Sql讀取Hive表時,默認以文本格式讀取.

        一些Hive存儲格式相關格式如下:

屬性名 描述
fileFormat fileFormat是一種存儲格式規范的包,包括 "serde","input format" 和 "output format"。 目前支持6個文件格式:'sequencefile','rcfile','orc','parquet','textfile'和'avro'
inputFormat, outputFormat 這兩個選項將相應的 "InputFormat" 和 "OutputFormat" 類的名稱指定為字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 這兩個選項必須成對出現,如果已經指定了 "fileFormat" 選項,則無法指定它們
serde 此選項指定 serde 類的名稱。 當指定 `fileFormat` 選項時,如果給定的 `fileFormat` 已經包含 serde 的信息,那么不要指定這個選項。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可以使用這3個文件格式的這個選項
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這些選項只能與 "textfile" 文件格式一起使用。它們定義如何將分隔的文件讀入行

      7.3.3.3 對Hive不支持的地方

        Spark-SQL,相當於內置了一個Hive引擎,它具有Hive絕大部分的功能,但確實不是完全支持Hive.Spark-SQL不支持的如下:

          Tables with buckets

          UNION  和 Unique join

          Hive的列靜態數據統計收集功能

          File format for CLI  Spark-SQL只支持 TextOutputFormat

          對一些UDF函數,Spark-SQL結果也與原生Hive不同

            SQRT(n) If n < 0, Hive returns null, Spark SQL returns NaN

            ACOS(n) If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN

            ASIN(n) If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN

          詳情請見官網描述

    7.3.3 JDBC

      7.3.3.1 簡述

        Spark-SQL 同樣支持以JDBC的形式從其它的關系型數據庫讀取數據.(此方式優於JdbcRDD,因為DF更容易與處理與其它數據源的交互,比如直接映射為臨時視圖等等)

        JDBC-驅動

          JDBC依賴目標數據庫驅動包.需要注意的是,這個驅動必須同時對driver端和executor端所有執行程序的類默認加載器可見(DriverManager會忽略不可見的驅動導致執行找不到驅動)

 

 

          例如,要從 Spark Shell 連接到 postgres 必須指定驅動包位置

            bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

          一個比較方便的辦法是修改所有工作節點 compute_classpath.sh 包含進驅動包

      7.3.3.2 JDBC常用配置

        可以在數據源選項中指定 JDBC 連接屬性。用戶 和 密碼通常作為登錄數據源的連接屬性提供。 除了連接屬性外,Spark 還支持以下不區分大小寫的選項

屬性 描述
url 要連接的JDBC URL。 源特定的連接屬性可以在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 應該讀取的 JDBC 表。請注意,可以使用在SQL查詢的 FROM 子句中有效的任何內容。 例如,您可以使用括號中的子查詢代替完整表
driver 用於連接到此 URL 的 JDBC driver 程序的類名
partitionColumn, lowerBound, upperBound 這三個屬性必須被一起設置.partitionColumn 必須是數字列.lowerBound 和 upperBound 僅用於決定分區的大小而不是用於過濾表中的行.因此,表中的所有行將被分區並返回
numPartitions 在表讀寫中可以用於並行度的最大分區數。這也確定並發JDBC連接的最大數量。 如果要寫入的分區數超過此限制,則在寫入之前通過調用 coalesce(numPartitions) 將其減少到此限制
fetchsize JDBC 抓取的大小,用於確定每次數據往返傳遞的行數。 這有利於提升 JDBC driver 的性能,它們的默認值較小(例如: Oracle 是 10 行)。 該選項僅適用於讀取操作
batchsize JDBC 批處理的大小,用於確定每次數據往返傳遞的行數。 這有利於提升 JDBC driver 的性能。 該選項僅適用於寫操作。默認值為 1000
isolationLevel

事務隔離級別,適用於當前連接。 它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, 或 SERIALIZABLE 之一,對應於 JDBC 連接對象定義的標准事務隔離級別,默認為 READ_UNCOMMITTED

此選項僅適用於寫操作。請參考 java.sql.Connection 中的文檔

sessionInitStatement

在每個數據庫會話打開到遠程DB並開始讀取數據之后,該選項將執行一個自定義SQL語句(或PL/SQL塊)。使用它來實現會話初始化代碼。

示例:選項(“sessionInitStatement”、“”開始執行即時的“alter session set”“_serial_direct_read”=true”;結束;”“”)

truncate 這是一個與 JDBC 相關的選項.啟用 SaveMode.Overwrite 時,此選項會導致 Spark 截斷現有表,而不是刪除並重新創建。它默認為 false。 此選項僅適用於寫操作
createTableOptions 這是一個與JDBC相關的選項。 如果指定,此選項允許在創建表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此選項僅適用於寫操作
createTableColumnTypes 這是一個與JDBC相關的選項.如果指定,此選項允許在創建表時設置特定於數據庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.).此選項僅適用於寫操作
customSchema

用於從連接器讀取JDBC數據的自定義模式。例如,“id DECIMAL(38,0), name STRING”。還可以指定部分字段,其他字段使用默認類型映射。

例如,“id DECIMAL(38,0)”。列名應該與JDBC表對應的列名相同。用戶可以指定Spark SQL的相應數據類型,而不是使用默認值。此選項僅適用於讀取

      7.3.3.2 JDBC-Demo

        /**
           * 連接RDBMS(SQLServer2008)Demo
           *   注:必須導入SQLServer驅動包(sqljdbc42.jar),(代碼中不能體現,但實際已在項目中添加過了)
           */
          val df = spark.read.format("jdbc")
                    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                    .option("url", "jdbc:sqlserver://127.0.0.1\\Sql2008:1433;DatabaseName=AdapterService")
                    .option("user", "sa")
                    .option("password", "12abAB")
                    .option("dbtable", "dbo.tm_flow")
                    .load()

          df.show(10);

      7.3.3.3 JDBC-性能調優

        內存緩存

          Spark-SQL 支持對JDBC結果緩存到內存以提升查詢效率

          spark.catalog.cacheTable("tableName") 或 dataFrame.cache() 將數據結果以列式存儲的形式存放至內存 

          spark.catalog.uncacheTable("tableName") 刪除緩存

        廣播

          broadcast(spark.table("xxx"))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM