Spark SQL - 對大規模的結構化數據進行批處理和流式處理


Spark SQL - 對大規模的結構化數據進行批處理和流式處理

大體翻譯自:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql.html

如同一般的 Spark 處理,Spark SQL 本質上也是大規模的基於內存的分布式計算。

Spark SQL 和 RDD 計算模型最大的區別在於數據處理的框架不同。Spark SQL 可以通過多種不同的方式對結構化的數據和半結構化的數據進行處理。它既可以使用 SQL , HiveQL 這種結構化查詢查詢語言,也可以使用類 SQL,聲明式,類型安全的Dataset API 進行查詢,這種被稱為 Structured Query DSL

Note:可以通過 Schema 對結構化和半結構化的數據進行描述。

Spark SQL 支持 批處理(Batch) 和流式處理(Struct streaming) 兩種處理方式。

Note:本質上,結構化查詢都會自動編譯為相應的 RDD 操作。

無論使用什么樣的查詢方式,所有的查詢都會轉化為一個由 Catalyst expressions 組成的樹,在這個過程中會對不斷的對查詢進行優化。

在 Spark 2.0 以后, Spark SQL 已經成為了 Spark 計算平台最主要的接口, 它通過更高層次的抽象封裝了RDD,方便用戶通過 SQL 處理數據。


// Define the schema using a case class
case class Person(name: String, age: Int)

// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek", 10)))

// Convert RDD[Person] to Dataset[Person] and run a query

// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]

// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10).where('age <= 19).select('name).as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]

scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+

// You could however want to use good ol' SQL, couldn't you?

// 1. Register people Dataset as a temporary view in Catalog
people.createOrReplaceTempView("people")

// 2. Run SQL query
val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19")
scala> teenagers.show
+-----+---+
| name|age|
+-----+---+
|Jacek| 10|
+-----+---+

通過啟動 Hive 支持 (enableHiveSupport),用戶可以 HiveQL 對 Hive 中的數據進行處理。

sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('path'='people.csv', 'header'='true')")

// Queries are expressed in HiveQL
sql("FROM v1").show

scala> sql("desc EXTENDED v1").show(false)
+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|# col_name|data_type|comment|
|key       |int      |null   |
|value     |string   |null   |
+----------+---------+-------+

和其它的數據庫一樣, Spark SQL 通過 Logical Query Plan Optimizer, code generation , Tungsten execution engine 來這些措施進行優化。

Spark SQL 引入了一種抽象的表格式的數據結構 Dataset。 通過 Dataset, Spark SQL 可以更加方便、快速的處理大批量的結構化數據。

Note:Spark SQL 借助 Apache Drill 直接在一些數據文件上進行查詢

下面的片段展示了如何讀取JSON文件,然后將一種一部分數據保存為CSV文件。

spark.read
  .format("json")
  .load("input-json")
  .select("name", "score")
  .where($"score" > 15)
  .write
  .format("csv")
  .save("output-csv")

DataSet 是 Spark SQL 中最核心的抽象。他表示了一批已知 schema 的結構化數據。這些數據可以可以保存在JVM 堆外的內存中,並且變為列壓縮的二進制串,來增加計算的速度,減少內存的使用和GC。

Spark SQL 支持 predicate pushdown 對 DataSet 的性能進行優化,並且可以在運行時生成優化代碼。

Spark SQL 包含了以下幾種 API:

  1. Dataset API
  2. Structred Streaming API
  3. SQL
  4. JDBC/ODBC

Spark SQL 通過 DataFrameReader 和 DataFrameWrite 這兩個統一的接口來訪問 HDFS 等存儲系統。

Spark SQL 定義了集中不同類型的函數:

  • 標准函數 和 UDF。
  • 基本的集合函數。
  • 窗口聚合函數。

如果你已經將一個 CSV 加載到一個 dataframe 中了,那你可以通過將 dataframe 注冊為 table, 然后使用 SQL 進行查詢。

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
  .agg(max('j).as("aggOrdering"))
  .orderBy(sum('j))
  .as[(Int, Int)]
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
|                  1|
|                  0|
+-------------------+
更多參考:
  1. Spark SQL home
  2. Spark’s Role in the Big Data Ecosystem - Matei Zaharia
  3. Introducing Apache Spark 2.0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM