小記--------sparksql執行全過程


 1 案例
 2 def main(args: Array[String]): Unit = {
 3  
 4     // 1.創建sparkconf
 5   val conf = new SparkConf()
 6     .setMaster("local")
 7     .setAppName("test-sql")
 8  
 9  
10   // 2.創建sparksession
11   val session: SparkSession = SparkSession
12     .builder()
13     .config(conf)
14     .getOrCreate()
15  
16  
17   // 3.創建數據表並讀取數據 , 並創建了student的數據表(視圖)
18     // 讀取本地student.json 文件。
19   //{"id": 1 , "name" : "Kate" , "age" : 29}
20   //{"id": 2 , "name" : "Andy" , "age" : 39}
21   //{"id": 3 , "name" : "Tony" , "age" : 10}
22   session
23     .read
24     .json("D:\\daima\\work\\1011\\spark-test-zhonghuashishan\\src\\test\\file\\student.json")
25     .createOrReplaceTempView("student")
26  
27  
28   // SQL查詢
29   session.sql("select name from student where age > 18 ").show()
30 }

 

 
一般來講,對於sparkSQL系統,從SQL到spark中的RDD的執行需要經過兩個大的階段、
    邏輯計划(LogicalPlan)
    物理計划(PhysicalPlan)
SQL執行過程概覽
 
邏輯計划階段
    會將用戶所寫的SQL語句轉換成樹型數據結構(邏輯算子樹),SQL語句中蘊含的邏輯映射到邏輯算子樹的不同節點,
    邏輯計划階段生成的邏輯算子樹並不會直接提交執行,僅作為中間階段。
 
    邏輯算子樹的生成過程經歷3個子階段
        1.未解析的邏輯算子樹;僅僅是數據結構,不包含任何數據信息等
        2.解析后的邏輯算子樹;節點中綁定各種信息
        3.優化后的邏輯算子樹;應用各種優化規則對一些低效的邏輯計划進行轉換
 
物理計划階段
    將上一步邏輯計划階段生成的邏輯算子樹進行進一步轉換,生成物理算子樹。
    物理算子樹的節點會直接生成RDD或對RDD進行transformation操作(注:每個物理計划節點中都實現了對RDD進行轉換的execute方法)
 
    物理計划階段的3個子階段
        1.物理算子樹的列表;(注:同樣的邏輯算子樹可能對應多個物理算子樹)
        2.最優物理算子樹;從算子樹列表中按照一定的策略選取最優的物理算子樹,然后對選取的物理算子樹進行提交前的准備工作;例如:確保分區操作正確,物理算子樹節點重用,執行代碼生成等
        3.准備后的物理算子樹;對物理算子樹生成的RDD執行action操作,即可提交程序
 
SQL語句的解析一直到提交之前,整個轉換過程都在spark集群的Driver端進行不涉及分布式環境。
 
 
Catalyst
sparkSQL內部實現流程中平台無關部分的基礎框架稱為Catalyst,它主要包括InternalRow體系、TreeNode體系和Expression體系。
 
InternalRow體系
    spark SQL 內部實現中,InternalRow就是用來表示一行行數據的類,物理算子樹節點產生和轉換的RDD類型即為RDD[InternalRow] 。 InternalRow中的每一列都是Catalyst內部定義的數據類型。
 
InternalRow作為一個抽象類,包含numFields和update方法,以及各列數據對應的get與set方法,InternalRow中都是根據下表來訪問和操作列元素的。
    其具體的實現包括BaseGenericInternalRow、UnsafeRow和JoinedRow3個直接子類
 
InternalRow體系
 
BaseGenericInternalRow:同樣是一個抽象類,實現了InternalRow中定義的所有get類型方法,這些方法的實現都通過調用類中定義的genericGet虛函數進行,該函數的實現在下一級子類中(也就是GenericInternalRow 、 SpecificInternalRow 、 MutbaleUnsafeInternalRow類中)
 
JoinedRow:該類主要用於Join操作,將兩個InternalRow放在一起形成新的InternalRow。使用時需要注意構造參數的順序。
 
UnsafeRow:不采用java對象存儲的方式,避免了JVM中垃圾回收(GC)的代價。此外UnsafeRow對行數據進行了特定的編碼,使得存儲更加高效。
 
BaseGenericInternalRow也有3個子類,分別是GenericInternalRow、SpecificInternalRow和 MutableUnsafeRow類。
    其中MutableUnsafeRow和UnsafeRow相關,用來支持對特定的列數據進行修改。
 
GenericInternalRow類源碼
//構造參數是Array[Any]類型,采用對象數組進行底層存儲、
// 注意:數組是非拷貝的,因此一但創建,就不允許通過set操作進行改變。
 1 class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow {
 2   /** No-arg constructor for serialization. */
 3   protected def this() = this(null)
 4  
 5  
 6   def this(size: Int) = this(new Array[Any](size))
 7  
 8 // 也是直接根據下表訪問的
 9   override protected def genericGet(ordinal: Int) = values(ordinal)
10  
11  
12   override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values.clone()
13  
14  
15   override def numFields: Int = values.length
16  
17  
18   override def setNullAt(i: Int): Unit = { values(i) = null}
19  
20  
21   override def update(i: Int, value: Any): Unit = { values(i) = value }
22 }
View Code

 

 
而SpecificInternalRow則是以Array[MutableValue]為構造參數的,允許通過set操作進行修改。
final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
 
 
TreeNode體系
    無論是邏輯計划還是物理計划,都離不開中間數據結構,在Catalyst中,對應的是TreeNode體系,TreeNode類是Sparksql中所有樹結構的基類,TreeNode內部包含一個Seq[BaseType]類型的變量children來表示節點,TreeNode定義了foreach、map、collect等針對節點操作方法,以及transformUp和transformDown等遍歷節點並對匹配節點進行相應轉換。
    TreeNode一直在內存里維護,不會dump到磁盤以文件形式存儲,且無論在映射邏輯執行計划階段還是優化邏輯執行計划階段,樹的修改都是以替換已有節點的方式進行。
 
TreeNode體系
 
TreeNode基本操作
除上述操作外,Catalyst中還提供了節點位置功能,即能夠根據TreeNode定位到對應的SQL字符串中的行數和起始位置,該功能在SQL解析發生異常時能夠方便用戶迅速找到出錯的地方
 1 // 在TreeNode類中
 2  
 3 case class Origin(
 4   line: Option[Int] = None,    // 行號
 5   startPosition: Option[Int] = None)   // 偏移量
 6  
 7 object CurrentOrigin {
 8   private val value = new ThreadLocal[Origin]() {
 9     override def initialValue: Origin = Origin()
10   }
11  
12  
13   def get: Origin = value.get()
14   def set(o: Origin): Unit = value.set(o)
15  
16  
17   def reset(): Unit = value.set(Origin())
18  
19  
20   def setPosition(line: Int, start: Int): Unit = {
21     value.set(
22       value.get.copy(line = Some(line), startPosition = Some(start)))
23   }
24  
25  
26   def withOrigin[A](o: Origin)(f: => A): A = {
27     set(o)
28     val ret = try f finally { reset() }
29     ret
30   }
31 }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM