Spark SQL


Spark SQL是支持在Spark中使用Sql、HiveSql、Scala中的關系型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把行對象用一個Schema來描述行里面的所有列的數據類型,它就像是關系型數據庫里面的一張表。它可以從原有的RDD創建,也可以是Parquet文件,最重要的是它可以支持用HiveQL從hive里面讀取數據。

下面是一些案例,可以在Spark shell當中運行。

首先我們要創建一個熟悉的Context,熟悉spark的人都知道吧,有了Context我們才可以進行各種操作。

val sc: SparkContext // 已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

Data Sources(數據源)

Spark SQL通過SchemaRDD接口支持在多種數據源上進行操作。一旦一個數據集被加載,它可以被注冊成一個表格,甚至可以和其它數據源有連接。

RDDs

 Spark SQL支持的一種表的類型是Scala的case class,case class定義了表的類型,下面是例子:

// sc是一個已經存在的SprakContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc) // import sqlContext._ import sqlContext.createSchemaRDD // case class在Scala 2.10里面最多支持22個列,為了突破這個限制,最好是定義一個類實現Product接口 case class Person(name: String, age: Int) // 為Person的對象創建一個RDD,然后注冊成一張表 val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) people.registerAsTable("people") // 直接寫sql吧,這個方法是sqlContext提供的 val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // teenagers是SchemaRDDs類型,它支持所有普通的RDD操作 teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

從上面這個方法來看,不是很好用,一個表好幾十個字段,我就得一個一個的去賦值,它現在支持的操作都是很簡單的操作,想要實現復雜的操作可以具體去看HiveContext提供的HiveQL。

Parquet Files

Parquet是一種列式存儲格式並且被許多數據處理系統支持。Parquet為Hadoop生態系統中的所有項目提供支持高效率壓縮的列式數據表達,而且與數據處理框架、數據模型或編程語言都沒有關系。Spark SQL提供了對Parquet的讀和寫,自動保留原始數據的架構。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// import sqlContext._ // createSchemaRDD被用來將RDD隱式轉換成一個SchemaRDD
import sqlContext.createSchemaRDD
val people: RDD[Person]
= ... // 同上面的例子. // 這個RDD已經隱式轉換成一個SchemaRDD, 允許它存儲成Parquet格式. people.saveAsParquetFile("people.parquet") // 從上面創建的文件里面讀取,加載一個Parquet文件的結果也是一種JavaSchemaRDD. val parquetFile = sqlContext.parquetFile("people.parquet") //注冊成表,然后在SQL狀態下使用 parquetFile.registerAsTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name:" + t(0)).collect().foreach(println)

JSON Datasets(JSON數據集)

JSON(JavaScript Object Notation) 是一種輕量級的數據交換格式。它基於JavaScript(Standard ECMA-262 3rd Edition - December 1999)的一個子集。 JSON采用完全獨立於語言的文本格式,但是也使用了類似於C語言家族的習慣(包括C, C++, C#, Java, JavaScript, Perl, Python等)。這些特性使JSON成為理想的數據交換語言。易於人閱讀和編寫,同時也易於機器解析和生成(網絡傳輸速度快)。

SparkSQL可以自動推斷出一個JSON數據集模式並作為一個SchemaRDD來加載。這種轉換可以通過使用SQLContext中的兩個方法中的一個得到:

jsonFile - 從JSON文件的目錄中加載數據,其中文件的每一行就是一個JSON對象。

jsonRdd - 從一個已存在的RDD中加載數據,其中每一個RDD元素都是一個包含一個JSON對象的字符串。

// sc 是已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 一個JSON 數據集用一個路徑指出
// 這個路徑既可以是一個單獨的文本文件,也可以是一個存儲文本文件的目錄
val path = "examples/src/main/resources/people.json"
// 根據路徑指出的文件生成一個SchemaRDD 
val people = sqlContext.jsonFile(path)

// 推斷的模式可以通過使用printSchema() 方法顯式化
people.printSchema()
// root
//  |-- age: IntegerType
//  |-- name: StringType

// 把SchemaRDD注冊成一個表
people.registerAsTable("people")

// SQL狀態可以通過使用sqlContext提供的sql方法運行
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另外,一個SchemaRDD也可以通過每個字符串存儲一個JSON數據集對象的string類型的RDD來生成
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

 

Hive Tables

 Spark SQL也支持讀寫存儲在Apache Hive上的數據。然而,hive的依賴太多了,默認的Spark assembly 是沒帶這些依賴的,需要我們運行 SPARK_HIVE=true sbt/sbt assembly/assembly重新編譯,或者用maven的時候添加 -Phive參數,它會重新編譯出來一個hive  assembly的jar包,然后需要把這個jar包放到所有的節點上。另外還需要把 hive-site.xml放到conf目錄下。沒進行hive部署的話,下面的例子也可以用LocalHiveContext來代替HiveContext。

val sc: SparkContext // 已經存在的SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// 引入這個Context,然后就會給所有的sql語句進行隱式轉換
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用HiveQL查詢
hql("FROM src SELECT key, value").collect().foreach(println)

或者寫成如下形式:

// sc is an existing SparkContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hiveContext.hql("FROM src SELECT key, value").collect().foreach(println)

Writing Language-Integrated Relational Queries

文字語言綜合關聯查詢,目前這個功能只是在Scala里面支持。

Spark SQL還支持一個特定域的語言編寫查詢。再次,利用上述實例數據:

// sc是一個已經存在的SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val people: RDD[Person] = ... // 同前面的例子. // 和后面這個語句是一樣的 'SELECT name FROM people WHERE age >= 10 AND age <= 19' val teenagers = people.where('age >= 10).where('age <= 19).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

DSL(領域語言)使用scala符號表示隱含表中的列,通過在前面加一個(‘)來標示。隱式轉換將這些符號表達式表示成SQL執行引擎的值。一個完整的功能支持列表可以在ScalaDoc中找到。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM