spark SQL (四)數據源 Data Source----Parquet 文件的讀取與加載


 spark SQL Parquet 文件的讀取與加載

    是由許多其他數據處理系統支持的柱狀格式。Spark SQL支持閱讀和編寫自動保留原始數據模式的Parquet文件。在編寫Parquet文件時,出於兼容性原因,所有列都會自動轉換為空。

1, 以編程方式加載數據

   這里使用上一節的例子中的數據:常規數據加載

  private def runBasicParquetExample(spark: SparkSession): Unit = {
     import spark.implicits._
    //
    val peopleDF = spark.read.json("examples/src/main/resources/people.json")
    //DataFrames可以保存為Parquet文件,維護模式信息
    peopleDF.write.parquet("people.parquet")


    //在上面創建的parquet文件中讀取
    // Parquet文件是自描述的,所以模式被保存
    //加載Parquet文件的結果也是一個DataFrame 
    val parquetFileDF = spark.read.parquet("people.parquet")


    // Parquet文件也可以用來創建臨時視圖,然后在SQL語句
     parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
  }
2,分區操作

      表分區是像Hive這樣的系統中常用的優化方法。在分區表中,數據通常存儲在不同的目錄中,分區列值在每個分區目錄的路徑中編碼。現在,Parquet數據源能夠自動發現和推斷分區信息。例如,我們可以使用以下目錄結構,兩個額外的列gender和country分區列將所有以前使用的人口數據存儲到分區表中:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
     通過傳遞path/to/table給SparkSession.read.parquet或者SparkSession.read.load,Spark SQL將自動從路徑中提取分區信息。現在,返回的DataFrame的模式變成:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
     請注意,分區列的數據類型是自動推斷的。目前支持數字數據類型和字符串類型。有時用戶可能不希望自動推斷分區列的數據類型。對於這些用例,可以使用spark.sql.sources.partitionColumnTypeInference.enabled默認 的自動類型推斷來配置true。當禁用類型推斷時,字符串類型將用於分區列。
      從Spark 1.6.0開始,默認情況下,分區僅在給定路徑下找到分區。對於上面的例子,如果用戶傳遞path/to/table/gender=male給 SparkSession.read.parquet或者SparkSession.read.load,gender將不會被視為分區列。如果用戶需要指定啟動分區發現的基本路徑,則可以basePath在數據源選項中進行設置。例如,何時path/to/table/gender=male將數據的路徑和用戶設置basePath為path/to/table/,gender將成為分區列。
3, scheme 合並

     像ProtocolBuffer,Avro和Thrift一樣,Parquet也支持模式演變。用戶可以從簡單的模式開始,並根據需要逐漸向模式添加更多的列。通過這種方式,用戶可能會以不同的但是 相互兼容的模式結束多個Parquet文件。Parquet數據源現在可以自動檢測這種情況並合並所有這些文件的模式。
     由於模式合並是一個相對昂貴的操作,並且在大多數情況下不是必需的,所以我們從1.5.0開始默認關閉它。你可以通過

      1)  將數據源選項設置mergeSchema為true讀取Parquet文件(如下面的示例所示)

       2)設置全局SQL選項spark.sql.parquet.mergeSchema來true。

例子如下:

  private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {

    import spark.implicits._

    // 創建一個簡單的DataFrame,存儲到一個分區目錄
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")

    //在新的分區目錄中創建另一個DataFrame,
    //添加一個新的列並刪除一個現存的列
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")

    //讀取分區表
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()

    //最終的模式由Parquet文件中的所有3列組成
    //分區列出現在分區目錄路徑中
    // root
    //  |-- value: int (nullable = true)
    //  |-- square: int (nullable = true)
    //  |-- cube: int (nullable = true)
    //  |-- key: int (nullable = true)
    // $example off:schema_merging$
  }
4, Hive metastore Parquet
     在讀取和寫入Hive metastore Parquet表格時,Spark SQL將嘗試使用自己的Parquet支持而不是Hive SerDe來獲得更好的性能。此行為由spark.sql.hive.convertMetastoreParquet配置控制 ,並默認打開。

Hive / Parquet Schema調解
     Hive和Parquet從表模式處理的角度來看,有兩個關鍵的區別。
      1)hive 是不區分大小寫的,而Parquet不是   

      2) Hive認為所有列都是可以空的,而Parquet的可空性是顯着的
由於這個原因,在將Hive metastore Parquet表轉換為Spark SQL Parquet表時,我們必須將Hive Metastore模式與Parquet模式協調一致。協調規則是:
     在兩個模式中具有相同名稱的字段必須具有相同的數據類型,而不管是否為空。協調字段應該具有Parquet方面的數據類型,以保證可空性。
協調的模式恰好包含在Hive Metastore模式中定義的那些字段。
     1)僅出現在Parquet模式中的任何字段將被放置在協調的模式中。
     2) 僅在Hive Metastore模式中出現的任何字段才會作為可協調字段添加到協調模式中。
 元數據刷新
        Spark SQL緩存Parquet元數據以獲得更好的性能。當Hive Metastore Parquet表轉換啟用時,這些轉換表的元數據也被緩存。如果這些表由Hive或其他外部工具更新,則需要手動刷新以確保一致的元數據。

spark.catalog.refreshTable("my_table")
5,Configuration配置

Parquet的結構可以用做setConf方法上SparkSession或通過運行 SET key=value使用SQL命令



Property Name
Default Meaning
spark.sql.parquet.binaryAsString false 一些其他派奎斯生產系統,特別是Impala,Hive和舊版本的Spark SQL,
在寫出Parquet架構時不會區分二進制數據和字符串。該標志告訴Spark SQL
將二進制數據解釋為字符串以提供與這些系統的兼容性。
spark.sql.parquet.int96AsTimestamp true  一些Parquet生產系統,特別是Impala和Hive,將時間戳存儲到INT96中。
該標志告訴Spark SQL將INT96數據解釋為一個時間戳,以提供與這些系統的兼容性。
spark.sql.parquet.cacheMetadata true 打開Parquet模式元數據的緩存。可以加快查詢靜態數據。
spark.sql.parquet.compression.codec snappy 設置寫入Parquet文件時使用的壓縮編解碼器。可接受的值包括:未壓縮,快速,
gzip,lzo。
spark.sql.parquet.filterPushdown true 設置為true時啟用Parquet過濾器下推優化。
spark.sql.hive.convertMetastoreParquet true 當設置為false時,Spark SQL將使用Hive SerDe來替代內置支持的Parquet表。
spark.sql.parquet.mergeSchema false
如果為true,則Parquet數據源合並從所有數據文件收集的模式,否則如果
沒有摘要文件可用,則從摘要文件或隨機數據文件中選取模式。
spark.sql.optimizer.metadataOnly true 如果為true,則啟用使用表元數據的僅限元數據查詢優化來生成分區列,而
不是表掃描。當掃描的所有列都是分區列時,該查詢將適用,並且查詢具有
滿足不同語義的聚合運算符。







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM