Spark SQL 之 Data Sources


Spark SQL 之 Data Sources


轉載請注明出處:http://www.cnblogs.com/BYRans/

數據源(Data Source)

Spark SQL的DataFrame接口支持多種數據源的操作。一個DataFrame可以進行RDDs方式的操作,也可以被注冊為臨時表。把DataFrame注冊為臨時表之后,就可以對該DataFrame執行SQL查詢。Data Sources這部分首先描述了對Spark的數據源執行加載和保存的常用方法,然后對內置數據源進行深入介紹。

一般Load/Save方法

Spark SQL的默認數據源為Parquet格式。數據源為Parquet文件時,Spark SQL可以方便的執行所有的操作。修改配置項spark.sql.sources.default,可修改默認數據源格式。讀取Parquet文件示例如下:

  • Scala
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
  • Java
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

手動指定選項(Manually Specifying Options)

當數據源格式不是parquet格式文件時,需要手動指定數據源的格式。數據源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果數據源格式為內置格式,則只需要指定簡稱(json,parquet,jdbc)。通過指定的數據源格式名,可以對DataFrames進行類型轉換操作。示例如下:

  • Scala
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
  • Java
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

存儲模式(Save Modes)

可以采用SaveMode執行存儲操作,SaveMode定義了對數據的處理模式。需要注意的是,這些保存模式不使用任何鎖定,不是原子操作。此外,當使用Overwrite方式執行時,在輸出新數據之前原數據就已經被刪除。SaveMode詳細介紹如下表:

SaveModes

持久化到表(Saving to Persistent Tables)

當使用HiveContext時,可以通過saveAsTable方法將DataFrames存儲到表中。與registerTempTable方法不同的是,saveAsTable將DataFrame中的內容持久化到表中,並在HiveMetastore中存儲元數據。存儲一個DataFrame,可以使用SQLContext的table方法。table先創建一個表,方法參數為要創建的表的表名,然后將DataFrame持久化到這個表中。

默認的saveAsTable方法將創建一個“managed table”,表示數據的位置可以通過metastore獲得。當存儲數據的表被刪除時,managed table也將自動刪除。

Parquet文件

Parquet是一種支持多種數據處理系統的柱狀的數據格式,Parquet文件中保留了原始數據的模式。Spark SQL提供了Parquet文件的讀寫功能。

讀取Parquet文件(Loading Data Programmatically)

讀取Parquet文件示例如下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  • Java
// sqlContext from the previous example is used in this example.

DataFrame schemaPeople = ... // The DataFrame from the previous example.

// DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

解析分區信息(Partition Discovery)

對表進行分區是對數據進行優化的方式之一。在分區的表內,數據通過分區列將數據存儲在不同的目錄下。Parquet數據源現在能夠自動發現並解析分區信息。例如,對人口數據進行分區存儲,分區列為gender和country,使用下面的目錄結構:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通過傳遞path/to/table給 SQLContext.read.parquet或SQLContext.read.load,Spark SQL將自動解析分區信息。返回的DataFrame的Schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

需要注意的是,數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數為:spark.sql.sources.partitionColumnTypeInference.enabled,默認值為true。如果想關閉該功能,直接將該參數設置為disabled。此時,分區列數據格式將被默認設置為string類型,不再進行類型解析。

Schema合並(Schema Merging)

像ProtocolBuffer、Avro和Thrift那樣,Parquet也支持Schema evolution(Schema演變)。用戶可以先定義一個簡單的Schema,然后逐漸的向Schema中增加列描述。通過這種方式,用戶可以獲取多個有不同Schema但相互兼容的Parquet文件。現在Parquet數據源能自動檢測這種情況,並合並這些文件的schemas。

因為Schema合並是一個高消耗的操作,在大多數情況下並不需要,所以Spark SQL從1.5.0開始默認關閉了該功能。可以通過下面兩種方式開啟該功能:

  • 當數據源為Parquet文件時,將數據源選項mergeSchema設置為true
  • 設置全局SQL選項spark.sql.parquet.mergeSchema為true

示例如下:

  • Scala
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metastore Parquet表轉換(Hive metastore Parquet table conversion)

當向Hive metastore中讀寫Parquet表時,Spark SQL將使用Spark SQL自帶的Parquet SerDe(SerDe:Serialize/Deserilize的簡稱,目的是用於序列化和反序列化),而不是用Hive的SerDe,Spark SQL自帶的SerDe擁有更好的性能。這個優化的配置參數為spark.sql.hive.convertMetastoreParquet,默認值為開啟。

Hive/Parquet Schema反射(Hive/Parquet Schema Reconciliation)

從表Schema處理的角度對比Hive和Parquet,有兩個區別:

  • Hive區分大小寫,Parquet不區分大小寫
  • hive允許所有的列為空,而Parquet不允許所有的列全為空

由於這兩個區別,當將Hive metastore Parquet表轉換為Spark SQL Parquet表時,需要將Hive metastore schema和Parquet schema進行一致化。一致化規則如下:

  • 這兩個schema中的同名字段必須具有相同的數據類型。一致化后的字段必須為Parquet的字段類型。這個規則同時也解決了空值的問題。
  • 一致化后的schema只包含Hive metastore中出現的字段。
    • 忽略只出現在Parquet schema中的字段
    • 只在Hive metastore schema中出現的字段設為nullable字段,並加到一致化后的schema中

元數據刷新(Metadata Refreshing)

Spark SQL緩存了Parquet元數據以達到良好的性能。當Hive metastore Parquet表轉換為enabled時,表修改后緩存的元數據並不能刷新。所以,當表被Hive或其它工具修改時,則必須手動刷新元數據,以保證元數據的一致性。示例如下:

  • Scala
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
  • Java
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

配置(Configuration)

配置Parquet可以使用SQLContext的setConf方法或使用SQL執行SET key=value命令。詳細參數說明如下:

Configuration

JSON數據集

Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集為DataFrame格式。讀取JSON數據集方法為SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換為DataFrame。

需要注意的是,這里的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自滿足有效的JSON對象。如果用多行描述一個JSON對象,會導致讀取出錯。讀取JSON數據集示例如下:

  • Scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
  • Java
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method.
people.printSchema();
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
  "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);

Hive表

Spark SQL支持對Hive的讀寫操作。需要注意的是,Hive所依賴的包,沒有包含在Spark assembly包中。增加Hive時,需要在Spark的build中添加 -Phive 和 -Phivethriftserver配置。這兩個配置將build一個新的assembly包,這個assembly包含了Hive的依賴包。注意,必須上這個心的assembly包到所有的worker節點上。因為worker節點在訪問Hive中數據時,會調用Hive的 serialization and deserialization libraries(SerDes),此時將用到Hive的依賴包。

Hive的配置文件為conf/目錄下的hive-site.xml文件。在YARN上執行查詢命令之前,lib_managed/jars目錄下的datanucleus包和conf/目錄下的hive-site.xml必須可以被driverhe和所有的executors所訪問。確保被訪問,最方便的方式就是在spark-submit命令中通過--jars選項和--file選項指定。

操作Hive時,必須創建一個HiveContext對象,HiveContext繼承了SQLContext,並增加了對MetaStore和HiveQL的支持。除了sql方法,HiveContext還提供了一個hql方法,hql方法可以執行HiveQL語法的查詢語句。示例如下:

  • Scala
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
  • Java
// sc is an existing JavaSparkContext.
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc);

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL.
Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

訪問不同版本的Hive Metastore(Interacting with Different Versions of Hive Metastore)

Spark SQL經常需要訪問Hive metastore,Spark SQL可以通過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。注意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操作(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。版本配置項見下面表格:

hiveMetastore

JDBC To Other Databases

Spark SQL支持使用JDBC訪問其他數據庫。當時用JDBC訪問其它數據庫時,最好使用JdbcRDD。使用JdbcRDD時,Spark SQL操作返回的DataFrame會很方便,也會很方便的添加其他數據源數據。JDBC數據源因為不需要用戶提供ClassTag,所以很適合使用Java或Python進行操作。
使用JDBC訪問數據源,需要在spark classpath添加JDBC driver配置。例如,從Spark Shell連接postgres的配置為:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

遠程數據庫的表,可用DataFrame或Spark SQL臨時表的方式調用數據源API。支持的參數有:

option

代碼示例如下:

  • Scala
val jdbcDF = sqlContext.read.format("jdbc").options( 
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
  • Java
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");

DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();

故障排除(Troubleshooting)

  • 在客戶端session和所有的executors上,JDBC driver必須對啟動類加載器(primordial class loader)設置為visible。因為當創建一個connection時,Java的DriverManager類會執行安全驗證,安全驗證將忽略所有對啟動類加載器為非visible的driver。一個很方便的解決方法是,修改所有worker節點上的compute_classpath.sh腳本,將driver JARs添加至腳本。
  • 有些數據庫(例:H2)將所有的名字轉換為大寫,所以在這些數據庫中,Spark SQL也需要將名字全部大寫。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM