Spark開發-SparkSQL執行過程和SQL相關


SparkSQL執行的場景

1.一種是直接寫sql語句,這個需要有元數據庫支持,例如Hive等庫
 2.另一種是通過Dataset/DataFrame編寫Spark應用程序,即 通過通過Dataset/DataFrame提供的APIs進行計算

Spark的執行過程

0.用戶在client端提交作業后,會由Driver運行main方法並創建spark context上下文
1.SparkContext  向集群資源管理器注冊並申請資源,資源管理器分配資源,並啟動Executor
2.SparkContext 根據Job構建基於Stage的DAG,DAG根據血緣的寬窄依賴划分Stage,並提交Stage給任務管理器TaskSchedule,生成由Task 
3.Executor經過SparkContext向TaskScheduler申請Task,並執行Task。Task執行結束后,釋放資源
4.SparkContext收集結果給Driver,運行結束后

SparkSQL執行過程

SQL 轉換為 RDD  
  1.邏輯計划階段
   邏輯計划階段會將用戶所寫的 SQL 語句轉換成 邏輯算子樹 
   分別對應未解析的邏輯算子樹( Unresolved LogicalPlan ,僅僅是數據結構,不包含任何數據信息等)、
          解析后的邏輯算子樹( Analyzed LogicalPlan ,節點中綁定各種信息)和
	      優化后的邏輯算子樹( Optimized LogicalPlan ,應用各種優化規則對一些低效的邏輯計划進行轉換) 。
  2.物理計划階段也包含 3 個子階段:
     首先,根據邏輯算子樹,生成物理算子樹的列表 Iterator[PhysicalPlan] (同樣的邏輯算子樹可能對應多個物理算子樹);
     然后,從列表中按照一定的策略選取最優的物理算子樹( SparkPlan);
     最后,對選取的物理算子樹進行提交前的准備工作,例如,確保分區操作正確、物理算子樹節點
     重用、執行代碼生成等,得到“准備后”的物理算子樹( Prepared SparkPlan )
  注意: SQL解析到執行之前的步驟,都是在Driver端完成
    邏輯計划階段的詳情
     1.:parser 基於Antlr 4 框架對 sql解析,生成抽象語法樹。AST是源代碼語法結構的一種抽象表示。它以樹狀的形式表現編程語言的語法結
     然后將antlr的tree轉成LogicPlan 邏輯計划
 2.analyzer;通過分析器,結合Catalog,把logical plan和實際的數據綁定起來
 3.Optimizer:logical plan優化,基於規則的優化;優化規則參考Optimizer,優化執行器RuleExecutor

Spark編譯過程

 邏輯計划
      Unresolved Logical Plan
      resolved logical plan
      Optimizated logical plan
 物理計划

Spark SQL 核心類

  上述不同階段對應的
  SparkSglParser類、Analyzer類、Optimizer類和SparkPlanner類等,
  最后封裝成一個QueryExecution對象
解析:源碼結構
- Catalyst (sql/catalyst) - An implementation-agnostic framework for
  manipulating trees of relational operators and expressions.
   實現詞法分析器Lexer 以及語法分析器Parser
   Antlr4         SQL-- AST
   AstBuilder  -- Unsolved  LogicalPlan
                  Analyzed  LogicalPlan
                  Optimized LogicalPlan
1- Execution (sql/core) - A query planner / execution engine for  translating Catalyst's logical query plans into Spark RDDs.  
   This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
2- Hive Support (sql/hive) - Includes an extension of SQLContext called HiveContext that 
allows users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes.
There are also wrappers that allows users to run queries that include Hive UDFs, UDAFs, and UDTFs.
3- HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.
  ANTLR4自動生成Spark SQL語法解析器Java代碼SqlBaseParser類 SqlBaseLexer和SqlBaseParser均是使用ANTLR 4自動生成的Java類。 使用這兩個解析器將SQL語句解
  然后在parsePlan中,使用 AstBuilder( AstBuilder.scala)將ANTLR 4語法樹結構轉換成catalyst表達式邏輯計划logical plan

Catalyst

 Catalyst中涉及的重要概念和數據結構,主要包括InternalRow體系、TreeNode體系和Expression體系
  InternalRow 表示一行行的數據
  TreeNode體系:
     Expression: 表達式體系即是子類也是一個體系
     QueryPlan: LogicalPlan SparkPlan
  數據類型體系

logicalplan和SparkPlan

logical plan 分為BinaryNode LeafNode UnaryNode 以及其他
  UnaryNode: 一元節點,即只有一個子節點
  BinaryNode:二元節點,即有左右子節點的二叉節點
  LeafNode:  葉子節點,沒有子節點的節點。
SparkPlan 根據操作的子節點類型主要分為三種trait: BinaryExecNode/LeafExecNode/UnaryExecNode 和其他類型plan
元數據和指標系統
whole stage codegen : 通過全流式代碼生成技術在運行時動態生成代碼,並盡量將所有的操作打包到一個函數中

SparkSQL Join類型

1.SparkSQL語法:
   Join的表情況:是否有重復數據,過濾等
   Join大致包括三個要素:  Join方式、Join條件 以及過濾條件
2.了解底層常用的Join算法以及各自的適用場景
SparkSQL支持三種Join算法-shuffle hash join、broadcast hash join以及sort merge join
   BroadcastHashJoin
   shuffleHashJoin
   SortMergeJoin
   其他: BroadcastNestedLoopJoin  CartesianProductJOin
hash join算法就來自於傳統數據庫,而shuffle和broadcast是分布式情況下的場景
     shuffle    :分別將兩個表按照join key進行分區,將相同join key的記錄重分布到同一節點,兩張表的數據會被重分布到集群中所有節點
     broadcast  :將小表廣播分發到大表所在的所有主機
SparkSQL對兩張大表join采用了全新的算法-  sort-merge join
   shuffle- sort -merge
     spark的shuffle實現都適用sort-based shuffle算法,因此在經過shuffle之后partition數據都是按照key排序的

SparkSQL配置

  Spark.sql.antoBroadcastJoinThreshold
  spark.sql.join.preferSortMergeJOin
   ……
 讀取數據:
   DataSource表 即Spark的表
    spark.sql.files.maxPartitionBytes
   Hive 表情況
    spark.sql.combine.hive.input.splites.enbale=true
    mapreduce.input.fileinputformart.split.maxsize
  落表:
     spark.sql.adaptive.repaartition.enable=true
  說明: limit運行反而很慢
        可以通過查看執行計划,這種情況多發生在分區表中,多是執行了在每個分區取數,然后在這些分區中再取限制的數據
      分布式的情況,有時候和理解的不一致,這時候通過分析執行計划,可以有效的減少 想當然和 實際上的分歧和錯誤

參考

 SparkSQL – 有必要坐下來聊聊Join http://hbasefly.com/2017/03/19/sparksql-basic-join/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM