介紹Spark SQL的JSON支持,這是我們在Databricks中開發的一個功能,可以在Spark中更容易查詢和創建JSON數據。隨着網絡和移動應用程序的普及,JSON已經成為Web服務API以及長期存儲的常用的交換格式。使用現有的工具,用戶通常會使用復雜的管道來在分析系統中讀取和寫入JSON數據集。在Apache Spark 1.1中發布Spark SQL的JSON支持,在Apache Spark 1.2中增強,極大地簡化了使用JSON數據的端到端體驗。
現有做法
實際上,用戶經常面臨使用現代分析系統處理JSON數據的困難。要將數據集寫入JSON格式,用戶首先需要編寫邏輯將其數據轉換為JSON。要閱讀和查詢JSON數據集,通常的做法是使用ETL流水線將JSON記錄轉換為預定義的結構。在這種情況下,用戶必須等待該進程完成才能使用其數據。對於寫作和閱讀,定義和維護模式定義通常會使ETL任務更加繁重,並消除了半結構化JSON格式的許多優點。如果用戶想要使用新的數據,則他們在創建外部表時必須費力定義模式,然后使用自定義的JSON序列化/反序列化庫,或者使用JSON UDF的組合來查詢數據。
例如,考慮具有以下JSON模式的數據集:
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}} {"name":"Michael", "address":{"city":null, "state":"California"}}
在像Hive這樣的系統中,JSON對象通常存儲為單列的值。要訪問此數據,將使用UDF提取和展平JSON對象中的字段。在下面顯示的SQL查詢中,提取外部字段(名稱和地址),然后進一步提取嵌套地址字段。
在下面的示例中,假設上面顯示的JSON數據集存儲在名為people的表中,JSON對象存儲在名為jsonObject的列中:
SELECT v1.name, v2.city, v2.state FROM people LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 as name, address LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2 as city, state;
JSON support in Spark SQL
Spark SQL提供了一種用於查詢JSON數據的自然語法,以及用於讀取和寫入數據的JSON模式的自動推斷。Spark SQL了解JSON數據中的嵌套字段,並允許用戶直接訪問這些字段,而無需任何明確的轉換。Spark SQL中的上述查詢如下所示:
SELECT name, age, address.city, address.state FROM people
在Spark SQL中加載和保存JSON數據集
要查詢Spark SQL中的JSON數據集,只需要將Spark SQL指向數據的位置。在沒有任何用戶規范的情況下,自動進行數據集格式推斷。在編程API中,可以通過SQLContext提供的jsonFile和jsonRDD方法來完成。使用這兩種方法,您可以為給定的JSON數據集創建一個SchemaRDD,然后可以將SchemaRDD注冊為表格。這是一個例子:
// Create a SQLContext (sc is an existing SparkContext) val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Suppose that you have a text file called people with the following content: // {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}} // {"name":"Michael", "address":{"city":null, "state":"California"}} // Create a SchemaRDD for the JSON dataset. val people = sqlContext.jsonFile("[the path to file people]") // Register the created SchemaRDD as a temporary table. people.registerTempTable("people")
也可以使用純SQL API創建JSON數據集。例如,對於通過JDBC服務器連接到Spark SQL的用戶,他們可以使用:
CREATE TEMPORARY TABLE people USING org.apache.spark.sql.json OPTIONS (path '[the path to the JSON dataset]')
在上述示例中,由於未提供數據結構,Spark SQL將通過掃描JSON數據集自動推斷模式。當一個字段是JSON對象或數組時,Spark SQL將使用STRUCT類型和ARRAY類型來表示此字段的類型。由於JSON是半結構化的,不同的元素可能具有不同的模式,Spark SQL也將解決字段數據類型的沖突。要了解JSON數據集的架構是什么,用戶可以使用編程API中返回的SchemaRDD提供的printSchema()方法或SQL中使用DESCRIBE [table name]來顯示模式。例如,通過people.printSchema()可視化的人的模式將是:
root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- name: string (nullable = true)
或者,當使用jsonFile和jsonRDD創建表時,用戶可以將模式應用於JSON數據集。在這種情況下,Spark SQL將將提供的模式綁定到JSON數據集,並且不會推斷模式。用戶不需要知道JSON數據集中出現的所有字段。指定的模式可以是出現在數據集中的字段的子集,也可以是不存在的字段。
創建表示JSON數據集的表后,用戶可以輕松地在JSON數據集上編寫SQL查詢,就像常規表一樣。與Spark SQL中的所有查詢一樣,查詢的結果由另一個SchemaRDD表示。例如:
val nameAndAddress = sqlContext.sql("SELECT name, address.city, address.state FROM people") nameAndAddress.collect.foreach(println)
SQL查詢的結果可以由其他數據分析任務直接和立即使用,例如機器學習管道。此外,JSON數據集可以輕松地緩存在Spark SQL內置的內存列存儲中,並以其他格式保存,如Parquet或Avro。
將SchemaRDD保存為JSON文件
在Spark SQL中,SchemaRDD可以通過toJSON方法以JSON格式輸出。由於SchemaRDD始終包含模式(包括對嵌套和復雜類型的支持),Spark SQL可以自動將數據集轉換為JSON,而不需要用戶定義的格式。SchemaRDD本身可以從許多類型的數據源創建,包括Apache Hive表,Parquet文件,JDBC,Avro文件,或者是對現有SchemaRDD的查詢結果。這種組合意味着無論數據源的來源如何,用戶都可以以最小的努力將數據遷移到JSON格式。
What’s next?
處理具有大量字段的JSON數據集
JSON數據通常是半結構化、非固定結構的。將來,我們將擴展Spark SQL對JSON支持,以處理數據集中的每個對象可能具有相當不同的結構的情況。例如,考慮使用JSON字段來保存表示HTTP標頭的鍵/值對的數據集。每個記錄可能會引入新的標題類型,並為每個記錄使用一個不同的列將產生一個非常寬的模式。我們計划支持自動檢測這種情況,而是使用map類型。因此,每行可以包含Map,使得能夠查詢其鍵/值對。這樣,Spark SQL將處理具有更少結構的JSON數據集,推動了基於SQL的系統可以處理的那種查詢的邊界。