0. 說明
DataSet 介紹 && Spark SQL 訪問 JSON 文件 && Spark SQL 訪問 Parquet 文件 && Spark SQL 訪問 JDBC 數據庫 && Spark SQL 作為分布式查詢引擎
1. DataSet 介紹
強類型集合,可以轉換成並行計算。
Dataset 上可以執行的操作分為 Transfermation 和 Action ,類似於 RDD。
Transfermation 生成新的 DataSet,Action 執行計算並返回結果。
DataSet 是延遲計算,只有當調用 Action 時才會觸發執行。內部表現為邏輯計划。
Action 調用時,Spark 的查詢優化器對邏輯計划進行優化,生成物理計划,用於分布式行為下高效的執行。
具體的執行計划可以通過 explain函數 來查看,方式如下:
scala> spark.sql("explain select name,class,score from tb_student").show(1000,false)
結果如圖所示,show(1000 , false) 表示顯式 1000行數據,結果不截斷顯式。

2. Spark SQL 訪問 JSON 文件
【保存 JSON 文件】
# 創建 DataFrame scala> val df = spark.sql("select * from orders") # 輸出 JSON 文件 scala> df.write.json("file:///home/centos/json")

【讀取 JSON 文件】
scala> val df = spark.read.json("file:///home/centos/json")
scala> df.show

3. Spark SQL 訪問 Parquet 文件
【保存】
# 創建 DataFrame scala> val df = spark.sql("select * from orders") # 保存成 parquet 文件 scala> df.write.parquet("file:///home/centos/par")
【讀取】
# 創建 DataFrame scala> val df = spark.read.parquet("file:///home/centos/par") # 讀取 Parquet 文件 scala> df.show

4. Spark SQL 訪問 JDBC 數據庫
【4.1 處理第三方 jar】
spark SQL 是分布式數據庫訪問,需要將驅動程序分發到所有 worker 節點或者通過 --jars 命令指定附件
分發 jar 到所有節點 ,third.jar 為第三方 jar 包
xsync /soft/spark/jars/third.jar
通過--jars 命令指定
spark-shell --master spark://s101:7077 --jars /soft/spark/jars/third.jar
【4.2 讀取 MySQL 數據】
val prop = new java.util.Properties() prop.put("driver" , "com.mysql.jdbc.Driver") prop.put("user" , "root") prop.put("password" , "root") # 讀取 val df = spark.read.jdbc("jdbc:mysql://192.168.23.101:3306/big12" , "music" ,prop) ; # 顯示 df.show
【4.3 保存數據到 MySQL 表(表不能存在)】
val prop = new java.util.Properties() prop.put("driver" , "com.mysql.jdbc.Driver") prop.put("user" , "root") prop.put("password" , "root") # 保存 dataframe.write.jdbc("jdbc:mysql://192.168.231.1:3306/mydb" , "emp" ,prop ) ;
5. Spark SQL 作為分布式查詢引擎
【5.1 說明】
終端用戶或應用程序可以直接同 Spark SQL 交互,而不需要寫其他代碼。

【5.2 啟動 Spark的 thrift-server 進程】
在 spark/sbin 目錄下執行以下操作
[centos@s101 /soft/spark/sbin]$ start-thriftserver.sh --master spark://s101:7077
【5.3 驗證】
查看 Spark WebUI,訪問 http://s101:8080
端口檢查,檢查10000端口是否啟動
netstat -anop | grep 10000
【5.4 使用 Spark 的 beeline 程序測試】
在 spark/bin 目錄下執行以下操作
# 進入 Spark 的 beeline [centos@s101 /soft/spark/bin]$ ./beeline # 連接 Hive !connect jdbc:hive2://localhost:10000/big12;auth=noSasl # 查看表 0: jdbc:hive2://localhost:10000/big12> show tables;
【5.5 編寫客戶端 Java 程序與 Spark 分布式查詢引擎交互】
[添加依賴]
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency>
[代碼編寫]
package com.share.sparksql; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; /** * 使用 Spark SQL 分布式查詢引擎 */ public class ThriftServerDemo { public static void main(String[] args) { try { Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection connection = DriverManager.getConnection("jdbc:hive2://s101:10000/big12;auth=noSasl"); ResultSet rs = connection.createStatement().executeQuery("select * from orders"); while (rs.next()) { System.out.printf("%d / %s\r\n", rs.getInt(1), rs.getString(2)); } rs.close(); } catch (Exception e) { e.printStackTrace(); } } }
[特別說明]
以上黃色部分為 HiveServer2 的驗證模式,如果未添加以上黃色部分則會報錯,報錯如下:

