[Spark SQL_3] Spark SQL 高級操作



 

0. 說明

   DataSet 介紹 && Spark SQL 訪問 JSON 文件 && Spark SQL 訪問 Parquet 文件 && Spark SQL 訪問 JDBC 數據庫 && Spark SQL 作為分布式查詢引擎

 

 


 

1. DataSet 介紹

  強類型集合,可以轉換成並行計算。

  Dataset 上可以執行的操作分為 Transfermation 和 Action ,類似於 RDD。

  Transfermation 生成新的 DataSet,Action 執行計算並返回結果。

  DataSet 是延遲計算,只有當調用 Action 時才會觸發執行。內部表現為邏輯計划。

  Action 調用時,Spark 的查詢優化器對邏輯計划進行優化,生成物理計划,用於分布式行為下高效的執行。

  具體的執行計划可以通過 explain函數 來查看,方式如下:

scala> spark.sql("explain select name,class,score from tb_student").show(1000,false)

 

  結果如圖所示,show(1000 , false) 表示顯式 1000行數據,結果不截斷顯式。

  

 

 


 

2. Spark SQL 訪問 JSON 文件

  【保存 JSON 文件】

# 創建 DataFrame
scala> val df = spark.sql("select * from orders")

# 輸出 JSON 文件
scala> df.write.json("file:///home/centos/json")

  

 

  【讀取 JSON 文件】

 

scala> val df = spark.read.json("file:///home/centos/json")

scala> df.show

  

 

 


 

3. Spark SQL 訪問 Parquet 文件 

  【保存】

# 創建 DataFrame
scala> val df = spark.sql("select * from orders")

# 保存成 parquet 文件
scala> df.write.parquet("file:///home/centos/par")

 

  【讀取】

# 創建 DataFrame
scala> val df = spark.read.parquet("file:///home/centos/par")

# 讀取 Parquet 文件
scala> df.show

  

 


 4. Spark SQL 訪問 JDBC 數據庫

  【4.1 處理第三方 jar】

  spark SQL 是分布式數據庫訪問,需要將驅動程序分發到所有 worker 節點或者通過 --jars 命令指定附件

  分發 jar 到所有節點 ,third.jar 為第三方 jar 包

xsync /soft/spark/jars/third.jar

 

  通過--jars 命令指定

 

spark-shell --master spark://s101:7077 --jars /soft/spark/jars/third.jar

 

 

  【4.2 讀取 MySQL 數據】

val prop = new java.util.Properties()
prop.put("driver" , "com.mysql.jdbc.Driver")
prop.put("user" , "root")
prop.put("password" , "root")
# 讀取
val df = spark.read.jdbc("jdbc:mysql://192.168.23.101:3306/big12" , "music" ,prop) ;
# 顯示
df.show

 

  【4.3 保存數據到 MySQL 表(表不能存在)】

val prop = new java.util.Properties()
prop.put("driver" , "com.mysql.jdbc.Driver")
prop.put("user" , "root")
prop.put("password" , "root")
# 保存
dataframe.write.jdbc("jdbc:mysql://192.168.231.1:3306/mydb" , "emp" ,prop ) ;

 


 

5. Spark SQL 作為分布式查詢引擎

  【5.1 說明】
  終端用戶或應用程序可以直接同 Spark SQL 交互,而不需要寫其他代碼。

  

 

 

  【5.2 啟動 Spark的 thrift-server 進程】

  在 spark/sbin 目錄下執行以下操作

[centos@s101 /soft/spark/sbin]$ start-thriftserver.sh --master spark://s101:7077

 

  【5.3 驗證】

  查看 Spark WebUI,訪問 http://s101:8080

  端口檢查,檢查10000端口是否啟動

netstat -anop | grep 10000

 

  【5.4 使用 Spark 的 beeline 程序測試】

  在 spark/bin 目錄下執行以下操作

 

# 進入 Spark 的 beeline
[centos@s101 /soft/spark/bin]$ ./beeline

# 連接 Hive
!connect jdbc:hive2://localhost:10000/big12;auth=noSasl

# 查看表
0: jdbc:hive2://localhost:10000/big12> show tables;

 

   【5.5 編寫客戶端 Java 程序與 Spark 分布式查詢引擎交互】

    [添加依賴]

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>

 

    [代碼編寫]

package com.share.sparksql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;

/**
 * 使用 Spark SQL 分布式查詢引擎
 */
public class ThriftServerDemo {
    public static void main(String[] args) {
        try {
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection connection = DriverManager.getConnection("jdbc:hive2://s101:10000/big12;auth=noSasl");
            ResultSet rs = connection.createStatement().executeQuery("select * from orders");
            while (rs.next()) {
                System.out.printf("%d / %s\r\n", rs.getInt(1), rs.getString(2));
            }
            rs.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

     [特別說明]

    以上黃色部分為 HiveServer2 的驗證模式,如果未添加以上黃色部分則會報錯,報錯如下:

  

 

 

 

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM