【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取
1、SparkSQL的發展歷程
1.1 Hive and Shark
SparkSQL的前身是Shark,給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生,它是當時唯一運行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce計算過程中大量的中間磁盤落地過程消耗了大量的I/O,降低的運行效率,為了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產生,其中表現較為突出的是:
l MapR的Drill
l Cloudera的Impala
l Shark
其中Shark是伯克利實驗室Spark生態環境的組件之一,它修改了下圖所示的右下角的內存管理、物理計划、執行三個模塊,並使之能運行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升。
1.2 Shark和SparkSQL
但是,隨着Spark的發展,對於野心勃勃的Spark團隊來說,Shark對於Hive的太多依賴(如采用Hive的語法解析器、查詢優化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個組件的相互集成,所以提出了SparkSQL項目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優點,如內存列存儲(In-Memory Columnar Storage)、Hive兼容性等,重新開發了SparkSQL代碼;由於擺脫了對Hive的依賴性,SparkSQL無論在數據兼容、性能優化、組件擴展方面都得到了極大的方便,真可謂“退一步,海闊天空”。
l數據兼容方面 不但兼容Hive,還可以從RDD、parquet文件、JSON文件中獲取數據,未來版本甚至支持獲取RDBMS數據以及cassandra等NOSQL數據;
l性能優化方面 除了采取In-Memory Columnar Storage、byte-code generation等優化技術外、將會引進Cost Model對查詢進行動態評估、獲取最佳物理計划等等;
l組件擴展方面 無論是SQL的語法解析器、分析器還是優化器都可以重新定義,進行擴展。
2014年6月1日Shark項目和SparkSQL項目的主持人Reynold Xin宣布:停止對Shark的開發,團隊將所有資源放SparkSQL項目上,至此,Shark的發展畫上了句話,但也因此發展出兩個直線:SparkSQL和Hive on Spark。
其中SparkSQL作為Spark生態的一員繼續發展,而不再受限於Hive,只是兼容Hive;而Hive on Spark是一個Hive的發展計划,該計划將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限於一個引擎,可以采用Map-Reduce、Tez、Spark等引擎。
1.3 SparkSQL的性能
Shark的出現,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高:
那么,擺脫了Hive的限制,SparkSQL的性能又有怎么樣的表現呢?雖然沒有Shark相對於Hive那樣矚目地性能提升,但也表現得非常優異:
為什么SparkSQL的性能會得到怎么大的提升呢?主要SparkSQL在下面幾點做了優化:
A:內存列存儲(In-Memory Columnar Storage)
SparkSQL的表數據在內存中存儲不是采用原生態的JVM對象存儲方式,而是采用內存列存儲,如下圖所示。
該存儲方式無論在空間占用量和讀取吞吐率上都占有很大優勢。
對於原生態的JVM對象存儲方式,每個對象通常要增加12-16字節的額外開銷,對於一個270MB的TPC-H lineitem table數據,使用這種方式讀入內存,要使用970MB左右的內存空間(通常是2~5倍於原生數據空間);另外,使用這種方式,每個數據記錄產生一個JVM對象,如果是大小為200B的數據記錄,32G的堆棧將產生1.6億個對象,這么多的對象,對於GC來說,可能要消耗幾分鍾的時間來處理(JVM的垃圾收集時間與堆棧中的對象數量呈線性相關)。顯然這種內存存儲方式對於基於內存計算的Spark來說,很昂貴也負擔不起。
對於內存列存儲來說,將所有原生數據類型的列采用原生數組來存儲,將Hive支持的復雜數據類型(如array、map等)先序化后並接成一個字節數組來存儲。這樣,每個列創建一個JVM對象,從而導致可以快速的GC和緊湊的數據存儲;額外的,還可以使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)降低內存開銷;更有趣的是,對於分析查詢中頻繁使用的聚合特定列,性能會得到很大的提高,原因就是這些列的數據放在一起,更容易讀入內存進行計算。
B:字節碼生成技術(bytecode generation,即CG)
在數據庫查詢中有一個昂貴的操作是查詢語句中的表達式,主要是由於JVM的內存模型引起的。比如如下一個查詢:
SELECT a + b FROM table
在這個查詢里,如果采用通用的SQL語法途徑去處理,會先生成一個表達式樹(有兩個節點的Add樹,參考后面章節),在物理處理這個表達式樹的時候,將會如圖所示的7個步驟:
1. 調用虛函數Add.eval(),需要確認Add兩邊的數據類型
2. 調用虛函數a.eval(),需要確認a的數據類型
3. 確定a的數據類型是Int,裝箱
4. 調用虛函數b.eval(),需要確認b的數據類型
5. 確定b的數據類型是Int,裝箱
6. 調用Int類型的Add
7. 返回裝箱后的計算結果
其中多次涉及到虛函數的調用,虛函數的調用會打斷CPU的正常流水線處理,減緩執行。
Spark1.1.0在catalyst模塊的expressions增加了codegen模塊,如果使用動態字節碼生成技術(配置spark.sql.codegen參數),SparkSQL在執行物理計划的時候,對匹配的表達式采用特定的代碼,動態編譯,然后運行。如上例子,匹配到Add方法:
然后,通過調用,最終調用:
最終實現效果類似如下偽代碼:
val a: Int = inputRow.getInt(0)
val b: Int = inputRow.getInt(1)
val result: Int = a + b
resultRow.setInt(0, result)
對於Spark1.1.0,對SQL表達式都作了CG優化,具體可以參看codegen模塊。CG優化的實現主要還是依靠scala2.10的運行時放射機制(runtime reflection)。對於SQL查詢的CG優化,可以簡單地用下圖來表示:
C:Scala代碼優化
另外,SparkSQL在使用Scala編寫代碼的時候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對於用戶來說,還是使用統一的接口,沒受到使用上的困難。下圖是一個Scala代碼優化的示意圖:
2、 SparkSQL運行架構
類似於關系型數據庫,SparkSQL也是語句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)組成,分別對應sql查詢過程中的Result、Data Source、Operation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。
當執行SparkSQL語句的順序為:
1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROM、WHERE),哪些是表達式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規范;
2.將SQL語句和數據庫的數據字典(列、表、視圖等等)進行綁定(Bind),如果相關的Projection、Data Source等都是存在的話,就表示這個SQL語句是可以執行的;
3.一般的數據庫會提供幾個執行計划,這些計划一般都有運行統計數據,數據庫會在這些計划中選擇一個最優計划(Optimize);
4.計划執行(Execute),按Operation-->Data Source-->Result的次序來進行的,在執行過程有時候甚至不需要讀取物理表就可以返回結果,比如重新運行剛運行過的SQL語句,可能直接從數據庫的緩沖池中獲取返回結果。
2.1 Tree和Rule
SparkSQL對SQL語句的處理和關系型數據庫對SQL語句的處理采用了類似的方法,首先會將SQL語句進行解析(Parse),然后形成一個Tree,在后續的如綁定、優化等處理過程都是對Tree的操作,而操作的方法是采用Rule,通過模式匹配,對不同類型的節點采用不同的操作。在整個sql語句的處理過程中,Tree和Rule相互配合,完成了解析、綁定(在SparkSQL中稱為Analysis)、優化、物理計划等過程,最終生成可以執行的物理計划。
2.1.1 Tree
l Tree的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
l Logical Plans、Expressions、Physical Operators都可以使用Tree表示
l Tree的具體操作是通過TreeNode來實現的
Ø SparkSQL定義了catalyst.trees的日志,通過這個日志可以形象的表示出樹的結構
Ø TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)進行操作
Ø 有了TreeNode,通過Tree中各個TreeNode之間的關系,可以對Tree進行遍歷操作,如使用transformDown、transformUp將Rule應用到給定的樹段,然后用結果替代舊的樹段;也可以使用transformChildrenDown、transformChildrenUp對一個給定的節點進行操作,通過迭代將Rule應用到該節點以及子節點。
l TreeNode可以細分成三種類型的Node:
Ø UnaryNode 一元節點,即只有一個子節點。如Limit、Filter操作
Ø BinaryNode 二元節點,即有左右子節點的二叉節點。如Jion、Union操作
Ø LeafNode 葉子節點,沒有子節點的節點。主要用戶命令類操作,如SetCommand
2.1.2 Rule
l Rule的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
l Rule在SparkSQL的Analyzer、Optimizer、SparkPlan等各個組件中都有應用到
l Rule是一個抽象類,具體的Rule實現是通過RuleExecutor完成
l Rule通過定義batch和batchs,可以簡便的、模塊化地對Tree進行transform操作
l Rule通過定義Once和FixedPoint,可以對Tree進行一次操作或多次操作(如對某些Tree進行多次迭代操作的時候,達到FixedPoint次數迭代或達到前后兩次的樹結構沒變化才停止操作,具體參看RuleExecutor.apply)
2.2 sqlContext和hiveContext的運行過程
SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext現在只支持SQL語法解析器(SQL-92語法);hiveContext現在支持SQL語法解析器和hivesql語法解析器,默認為hiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行hiveSQL不支持的語法,
2.2.1 sqlContext的運行過程
sqlContext總的一個過程如下圖所示:
1.SQL語句經過SqlParse解析成UnresolvedLogicalPlan;
2.使用analyzer結合數據數據字典(catalog)進行綁定,生成resolvedLogicalPlan;
3.使用optimizer對resolvedLogicalPlan進行優化,生成optimizedLogicalPlan;
4.使用SparkPlan將LogicalPlan轉換成PhysicalPlan;
5.使用prepareForExecution()將PhysicalPlan轉換成可執行物理計划;
6.使用execute()執行可執行物理計划;
7.生成SchemaRDD。
在整個運行過程中涉及到多個SparkSQL的組件,如SqlParse、analyzer、optimizer、SparkPlan等等
2.2.2hiveContext的運行過程
hiveContext總的一個過程如下圖所示:
1.SQL語句經過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程中對hiveql語句使用getAst()獲取AST樹,然后再進行解析;
2.使用analyzer結合數據hive源數據Metastore(新的catalog)進行綁定,生成resolved LogicalPlan;
3.使用optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan,優化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;
4.使用hivePlanner將LogicalPlan轉換成PhysicalPlan;
5.使用prepareForExecution()將PhysicalPlan轉換成可執行物理計划;
6.使用execute()執行可執行物理計划;
7.執行后,使用map(_.copy)將結果導入SchemaRDD。
2.3 catalyst優化器
SparkSQL1.1總體上由四個模塊組成:core、catalyst、hive、hive-Thriftserver:
l core處理數據的輸入輸出,從不同的數據源獲取數據(RDD、Parquet、json等),將查詢結果輸出成schemaRDD;
l catalyst處理查詢語句的整個處理過程,包括解析、綁定、優化、物理計划等,說其是優化器,還不如說是查詢引擎;
l hive對hive數據的處理
l hive-ThriftServer提供CLI和JDBC/ODBC接口
在這四個模塊中,catalyst處於最核心的部分,其性能優劣將影響整體的性能。由於發展時間尚短,還有很多不足的地方,但其插件式的設計,為未來的發展留下了很大的空間。下面是catalyst的一個設計圖:
其中虛線部分是以后版本要實現的功能,實線部分是已經實現的功能。從上圖看,catalyst主要的實現組件有:
lsqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器;
lAnalyzer,主要完成綁定工作,將不同來源的Unresolved LogicalPlan和數據元數據(如hive metastore、Schema catalog)進行綁定,生成resolved LogicalPlan;
loptimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan;
l Planner將LogicalPlan轉換成PhysicalPlan;
l CostModel,主要根據過去的性能統計數據,選擇最佳的物理執行計划
這些組件的基本實現方法:
l 先將sql語句通過解析生成Tree,然后在不同階段使用不同的Rule應用到Tree上,通過轉換完成各個組件的功能。
l Analyzer使用Analysis Rules,配合數據元數據(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉換成resolved LogicalPlan;
l optimizer使用Optimization Rules,對resolved LogicalPlan進行合並、列裁剪、過濾器下推等優化作業而轉換成optimized LogicalPlan;
l Planner使用Planning Strategies,對optimized LogicalPlan
3、SparkSQL CLI
CLI(Command-Line Interface,命令行界面)是指可在用戶提示符下鍵入可執行指令的界面,它通常不支持鼠標,用戶通過鍵盤輸入指令,計算機接收到指令后予以執行。Spark CLI指的是使用命令界面直接輸入SQL命令,然后發送到Spark集群進行執行,在界面中顯示運行過程和最終的結果。
Spark1.1相較於Spark1.0最大的差別就在於Spark1.1增加了Spark SQL CLI和ThriftServer,使得Hive用戶還有用慣了命令行的RDBMS數據庫管理員較容易地上手,真正意義上進入了SQL時代。
【注】Spark CLI和Spark Thrift Server實驗環境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建
3.1 運行環境說明
3.1.1 硬軟件環境
l 主機操作系統:Windows 64位,雙核4線程,主頻2.2G,10G內存
l 虛擬軟件:VMware® Workstation 9.0.0 build-812388
l 虛擬機操作系統:CentOS 64位,單核
l 虛擬機運行環境:
Ø JDK:1.7.0_55 64位
Ø Hadoop:2.2.0(需要編譯為64位)
Ø Scala:2.11.4
Ø Spark:1.1.0(需要編譯)
Ø Hive:0.13.1
3.1.2 機器網絡環境
集群包含三個節點,節點之間可以免密碼SSH訪問,節點IP地址和主機名分布如下:
序號 |
機器名 |
類型 |
核數/內存 |
用戶名 |
目錄 |
|
1 |
192.168.0.61 |
hadoop1 |
NN/DN/RM Master/Worker |
1核/3G |
hadoop |
/app 程序所在路徑 /app/scala-... /app/hadoop /app/complied |
2 |
192.168.0.62 |
hadoop2 |
DN/NM/Worker |
1核/2G |
hadoop |
|
3 |
192.168.0.63 |
hadoop3 |
DN/NM/Worker |
1核/2G |
hadoop |
3.2 配置並啟動
3.2.1 創建並配置hive-site.xml
在運行Spark SQL CLI中需要使用到Hive Metastore,故需要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下創建hive-site.xml文件,然后在該配置文件中,添加hive.metastore.uris屬性,具體如下:
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
</configuration>
3.2.2 啟動Hive
在使用Spark SQL CLI之前需要啟動Hive Metastore(如果數據存放在HDFS文件系統,還需要啟動Hadoop的HDFS),使用如下命令可以使Hive Metastore啟動后運行在后台,可以通過jobs查詢:
$nohup hive --service metastore > metastore.log 2>&1 &
3.2.3 啟動Spark集群和Spark SQL CLI
通過如下命令啟動Spark集群和Spark SQL CLI:
$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh
$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g
在集群監控頁面可以看到啟動了SparkSQL應用程序:
這時就可以使用HQL語句對Hive數據進行查詢,另外可以使用COMMAND,如使用set進行設置參數:默認情況下,SparkSQL Shuffle的時候是200個partition,可以使用如下命令修改該參數:
SET spark.sql.shuffle.partitions=20;
運行同一個查詢語句,參數改變后,Task(partition)的數量就由200變成了20。
3.2.4 命令參數
通過bin/spark-sql --help可以查看CLI命令參數:
其中[options] 是CLI啟動一個SparkSQL應用程序的參數,如果不設置--master的話,將在啟動spark-sql的機器以local方式運行,只能通過http://機器名:4040進行監控;這部分參數,可以參照Spark1.0.0 應用程序部署工具spark-submit 的參數。
[cli option]是CLI的參數,通過這些參數CLI可以直接運行SQL文件、進入命令行運行SQL命令等等,類似以前的Shark的用法。需要注意的是CLI不是使用JDBC連接,所以不能連接到ThriftServer;但可以配置conf/hive-site.xml連接到Hive的Metastore,然后對Hive數據進行查詢。
3.3 實戰Spark SQL CLI
3.3.1 獲取訂單每年的銷售單數、銷售總額
第一步 設置任務個數,在這里修改為20個
spark-sql>SET spark.sql.shuffle.partitions=20;
第二步 運行SQL語句
spark-sql>use hive;
spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
第三步 查看運行結果
3.3.2 計算所有訂單每年的總金額
第一步 執行SQL語句
spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
第二步 執行結果
使用CLI執行結果如下:
3.3.3 計算所有訂單每年最大金額訂單的銷售額
spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;
使用CLI執行結果如下:
4、Spark Thrift Server
ThriftServer是一個JDBC/ODBC接口,用戶可以通過JDBC/ODBC連接ThriftServer來訪問SparkSQL的數據。ThriftServer在啟動的時候,會啟動了一個SparkSQL的應用程序,而通過JDBC/ODBC連接進來的客戶端共同分享這個SparkSQL應用程序的資源,也就是說不同的用戶之間可以共享數據;ThriftServer啟動時還開啟一個偵聽器,等待JDBC客戶端的連接和提交查詢。所以,在配置ThriftServer的時候,至少要配置ThriftServer的主機名和端口,如果要使用Hive數據的話,還要提供Hive Metastore的uris。
【注】Spark CLI和Spark Thrift Server實驗環境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建
4.1 配置並啟動
4.1.1 創建並配置hive-site.xml
第一步 創建hive-site.xml配置文件
在$SPARK_HOME/conf目錄下修改hive-site.xml配置文件(如果在Spark SQL CLI中已經添加,可以省略):
$cd /app/hadoop/spark-1.1.0/conf
$sudo vi hive-site.xml
第二步 修改配置文件
設置hadoop1為Metastore服務器,hadoop2為Thrift Server服務器,配置內容如下:
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>hive.server2.thrift.min.worker.threads</name>
<value>5</value>
<description>Minimum number of Thrift worker threads</description>
</property>
<property>
<name>hive.server2.thrift.max.worker.threads</name>
<value>500</value>
<description>Maximum number of Thrift worker threads</description>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
<description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop2</value>
<description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>
</property>
</configuration>
4.1.2 啟動Hive
在hadoop1節點中,在后台啟動Hive Metastore(如果數據存放在HDFS文件系統,還需要啟動Hadoop的HDFS):
$nohup hive --service metastore > metastore.log 2>&1 &
4.1.3 啟動Spark集群和Thrift Server
在hadoop1節點啟動Spark集群
$cd /app/hadoop/spark-1.1.0/sbin
$./start-all.sh
在hadoop2節點上進入SPARK_HOME/sbin目錄,使用如下命令啟動Thrift Server
$cd /app/hadoop/spark-1.1.0/sbin
$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g
注意:Thrift Server需要按照配置在hadoop2啟動!
在集群監控頁面可以看到啟動了SparkSQL應用程序:
4.1.4 命令參數
使用sbin/start-thriftserver.sh --help可以查看ThriftServer的命令參數:
$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]
Thrift server options: Use value for given property
其中[options] 是Thrift Server啟動一個SparkSQL應用程序的參數,如果不設置--master的話,將在啟動Thrift Server的機器以local方式運行,只能通過http://機器名:4040進行監控;這部分參數,可以參照Spark1.0.0 應用程序部署工具spark-submit 的參數。在集群中提供Thrift Server的話,一定要配置master、executor-memory等參數。
[thrift server options]是Thrift Server的參數,可以使用-dproperty=value的格式來定義;在實際應用上,因為參數比較多,通常使用conf/hive-site.xml配置。
4.2 實戰Thrift Server
4.2.1 遠程客戶端連接
可以在任意節點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,因為沒有采用權限管理,所以用戶名用運行bin/beeline的用戶hadoop,密碼為空:
$cd /app/hadoop/spark-1.1.0/bin
$./beeline
beeline>!connect jdbc:hive2://hadoop2:10000
4.2.2 基本操作
第一步 顯示hive數據庫所有表
beeline>show database;
beeline>use hive;
beeline>show tables;
第二步 創建表testThrift
beeline>create table testThrift(field1 String , field2 Int);
beeline>show tables;
第三步 把tbStockDetail表中金額大於3000插入到testThrift表中
beeline>insert into table testThrift select ordernumber,amount from tbStockDetail where amount>3000;
beeline>select * from testThrift;
第四步 重新創建testThrift表中,把年度最大訂單插入該表中
beeline>drop table testThrift;
beeline>create table testThrift (field1 String , field2 Int);
beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;
beeline>select * from testThrift;
4.2.3 計算所有訂單每年的訂單數
spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;
Stage監控頁面:
查看Details for Stage 28
4.2.4 計算所有訂單月銷售額前十名
spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;
Stage監控頁面:
在其第一個Task中,從本地讀入數據
在后面的Task是從內存中獲取數據
4.2.5 緩存表數據
第一步 緩存數據
beeline>cache table tbStock;
beeline>select count(*) from tbStock;
第二步 運行4.2.4中的“計算所有訂單月銷售額前十名”
beeline>select count(*) from tbStock;
本次計算划給11.233秒,查看webUI,數據已經緩存,緩存率為100%:
第三步 在另外節點再次運行
在hadoop3節點啟動bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,然后直接運行對tbStock計數(注意沒有進行數據庫的切換):
用時0.343秒,再查看webUI中的stage:
Locality Level是PROCESS,顯然是使用了緩存表。
從上可以看出,ThriftServer可以連接多個JDBC/ODBC客戶端,並相互之間可以共享數據。順便提一句,ThriftServer啟動后處於監聽狀態,用戶可以使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。
4.2.6 在IDEA中JDBC訪問
有了ThriftServer,開發人員可以非常方便的使用JDBC/ODBC來訪問SparkSQL。下面是一個scala代碼,查詢表tbStockDetail,返回amount>3000的單據號和交易金額:
第一步 在IDEA創建class6包和類JDBCofSparkSQL
參見《Spark編程模型(下)--IDEA搭建及實戰》在IDEA中創建class6包並新建類JDBCofSparkSQL。該類中查詢tbStockDetail金額大於3000的訂單:
package class6
import java.sql.DriverManager
object JDBCofSparkSQL {
def main(args: Array[String]) {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")
try {
val statement = conn.createStatement
val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail where amount>3000")
while (rs.next) {
val ordernumber = rs.getString("ordernumber")
val amount = rs.getString("amount")
println("ordernumber = %s, amount = %s".format(ordernumber, amount))
}
} catch {
case e: Exception => e.printStackTrace
}
conn.close
}
}
第二步 查看運行結果
在IDEA中可以觀察到,在運行日志窗口中沒有運行過程的日志,只顯示查詢結果
第三步 查看監控結果
從Spark監控界面中觀察到,該Job有一個編號為6的Stage,該Stage有2個Task,分別運行在hadoop1和hadoop2節點,獲取數據為NODE_LOCAL方式。
在hadoop2中觀察Thrift Server運行日志如下: