理解Spark SQL(二)—— SQLContext和HiveContext


使用Spark SQL,除了使用之前介紹的方法,實際上還可以使用SQLContext或者HiveContext通過編程的方式實現。前者支持SQL語法解析器(SQL-92語法),后者支持SQL語法解析器和HiveSQL語法解析器,默認為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器來運行HiveQL不支持的語法,如:select 1。實際上HiveContext是SQLContext的子類,因此在HiveContext運行過程中除了override的函數和變量,可以使用和SQLContext一樣的函數和變量。

因為spark-shell工具實際就是運行的scala程序片段,為了方便,下面采用spark-shell進行演示。

首先來看SQLContext,因為是標准SQL,可以不依賴於Hive的metastore,比如下面的例子(沒有啟動hive metastore):

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn --conf spark.sql.catalogImplementation=in-memory

 

 scala> case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
defined class offices

scala> val rddOffices=sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rddOffices: org.apache.spark.rdd.RDD[offices] = MapPartitionsRDD[3] at map at <console>:26

scala> val officesDataFrame = spark.createDataFrame(rddOffices)
officesDataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]

scala> officesDataFrame.createOrReplaceTempView("offices")

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

 執行上面的命令后,實際上在yarn集群中啟動了一個yarn client模式的Spark Application,然后在scala>提示符后輸入的語句會生成RDD的transformation,最后一條命令中的collect會生成RDD的action,即會觸發Job的提交和程序的執行。

命令行中之所以加上--conf spark.sql.catalogImplementation=in-memory選項,是因為spark-shell中的默認啟動的SparkSession對象spark是默認支持Hive的,不帶這個選項啟動的話,程序就會去連接hive metastore,因為這里並沒有啟動hive metastore,因此程序在執行createDataFrame函數時會報錯。

程序中的第一行是1個case class語句,這里是定義后面的數據文件的模式的(定義模式除了這個方法,其實還有另外一種方法,后面再介紹)。第二行從hdfs中讀取一個文本文件,並工通過map映射到了模式上面。第三行基於第二行的RDD生成DataFrame,第四行基於第三行的DataFrame注冊了一個邏輯上的臨時表,最后一行就可以通過SparkSession的sql函數來執行sql語句了。

實際上,SQLContext是Spark 1.x中的SQL入口,在Spark 2.x中,使用SparkSession作為SQL的入口,但是為了向后兼容,Spark 2.x仍然支持SQLContext來操作SQL,不過會提示deprecated,所以上面的例子是采用Spark 2.x中的寫法。

實際上還有另外一種方法來操作SQL,針對同樣的數據,例如:

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false), StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(office,IntegerType,false), StructField(city,StringType,false), StructField(region,StringType,false), StructField(mgr,IntegerType,true), StructField(target,DoubleType,true), StructField(sales,DoubleType,false))

scala> val rowRDD = sc.textFile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => Row(p(0).trim.toInt,p(1),p(2),p(3).trim.toInt,p(4).trim.toDouble,p(5).trim.toDouble))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30

scala> val dataFrame = spark.createDataFrame(rowRDD, schema)
dataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]

scala> dataFrame.createOrReplaceTempView("offices")

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

這個例子與之前的例子有一些不同,主要的地方有3個:

1. 之前的例子是采用case class定義模式,Spark采用反射來推斷Schema;而這個例子采用StructType類型的對象來定義模式,它接收一個數組,數組成員是StructField對象,代表一個字段的定義,每個字段的定義由字段名稱、字段類型和是否允許為空組成;

2. 對於代表數據的RDD,之前的例子是直接用case class定義的類型來分割字段,而這個例子是用的Row類型;

3. 在使用createDataFrame函數生成DataFrame時,該函數的參數不一樣,之前的例子只要傳入RDD對象即可(對象中隱含了模式),而這個例子需要同時傳入RDD和定義的schema;

實際編程中建議采用第二種方法,因為其更加靈活,schema信息可以不必是寫死的,而是可以在程序運行的過程中生成。

 

下面接着來看HiveContext的用法,使用HiveContext之前需要確保:

  • 使用的Spark是支持Hive的;
  • Hive的配置文件hive-site.xml已經在Spark的conf目錄下;
  • hive metastore已經啟動;

舉例說明:

首先啟動hive metastore:

[root@BruceCentOS ~]# nohup hive --service metastore &

然后仍然通過spark-shell來舉例說明,啟動spark-shell,如下所示:

[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn

scala> spark.sql("show databases").collect.foreach(println)
[default]
[orderdb]

scala> spark.sql("use orderdb")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]

scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork                                                                   
City: Chicago
City: Atlanta

scala>

可以看到這次啟動spark-shell沒有帶上最后那個選項,這是因為這里我們打算用HiveContext來操作Hive中的數據,需要支持Hive。前面說過spark-shell是默認開啟了Hive支持的。同SQLContext類似,Spark 2.x中也不需要再用HiveContext對象來操作SQL了,直接用SparkSession對象來操作就好了。可以看到這里可以直接操作表,不用再定義schema,這是因為schema是由外部的hive metastore定義的,spark通過連接到hive metastore來讀取表的schema信息,因此這里能直接操作SQL。

 

另外,除了上面的使用SQLContext操作普通文件(需要額外定義模式)和使用HiveContext操作Hive表數據(需要開啟hive metastore)之外,SQLContext還能操作JSON、PARQUET等文件,由於這兩種數據文件自己帶了模式信息,因此可以直接基於文件創建DataFrame,例如:

scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.createOrReplaceTempView("people")

scala> spark.sql("select name,age from people where age>19").map(t=>"Name :" + t(0) + ", Age: " + t(1)).collect.foreach(println)
Name :Andy, Age: 30    

 

最后來看下DataFrame的另一種叫做DSL(Domain Specific Language)的用法。

scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]                

scala> df.show()
+----+-------+                                                                  
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> df.select("name").show()
+-------+                                                                       
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+                                                             
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


scala> df.groupBy("age").count().show()
+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+


scala>

以上是對Spark SQL的SQLContext和HiveContext基本用法的一些總結,都是采用spark-shell工具舉的例子。實際上由於spark-shell是運行scala程序片段的工具,上述例子完全可以改成獨立的應用程序。我將在下一篇博文當中嘗試使用Scala、Java和Python來編寫獨立的程序來操作上面的示例hive數據庫orderdb,可以適當使用一些較為復雜的SQL來統計分析數據。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM