1 案例 2 def main(args: Array[String]): Unit = { 3 4 // 1.創建sparkconf 5 val conf = new SparkConf() 6 .setMaster("local") 7 .setAppName("test-sql") 8 9 10 // 2.創建sparksession 11 val session: SparkSession = SparkSession 12 .builder() 13 .config(conf) 14 .getOrCreate() 15 16 17 // 3.創建數據表並讀取數據 , 並創建了student的數據表(視圖) 18 // 讀取本地student.json 文件。 19 //{"id": 1 , "name" : "Kate" , "age" : 29} 20 //{"id": 2 , "name" : "Andy" , "age" : 39} 21 //{"id": 3 , "name" : "Tony" , "age" : 10} 22 session 23 .read 24 .json("D:\\daima\\work\\1011\\spark-test-zhonghuashishan\\src\\test\\file\\student.json") 25 .createOrReplaceTempView("student") 26 27 28 // SQL查詢 29 session.sql("select name from student where age > 18 ").show() 30 }
一般來講,對於sparkSQL系統,從SQL到spark中的RDD的執行需要經過兩個大的階段、
邏輯計划(LogicalPlan)
物理計划(PhysicalPlan)

SQL執行過程概覽
邏輯計划階段
會將用戶所寫的SQL語句轉換成樹型數據結構(邏輯算子樹),SQL語句中蘊含的邏輯映射到邏輯算子樹的不同節點,
邏輯計划階段生成的邏輯算子樹並不會直接提交執行,僅作為中間階段。
邏輯算子樹的生成過程經歷3個子階段
1.未解析的邏輯算子樹;僅僅是數據結構,不包含任何數據信息等
2.解析后的邏輯算子樹;節點中綁定各種信息
3.優化后的邏輯算子樹;應用各種優化規則對一些低效的邏輯計划進行轉換
物理計划階段
將上一步邏輯計划階段生成的邏輯算子樹進行進一步轉換,生成物理算子樹。
物理算子樹的節點會直接生成RDD或對RDD進行transformation操作(注:每個物理計划節點中都實現了對RDD進行轉換的execute方法)
物理計划階段的3個子階段
1.物理算子樹的列表;(注:同樣的邏輯算子樹可能對應多個物理算子樹)
2.最優物理算子樹;從算子樹列表中按照一定的策略選取最優的物理算子樹,然后對選取的物理算子樹進行提交前的准備工作;例如:確保分區操作正確,物理算子樹節點重用,執行代碼生成等
3.准備后的物理算子樹;對物理算子樹生成的RDD執行action操作,即可提交程序
SQL語句的解析一直到提交之前,整個轉換過程都在spark集群的Driver端進行不涉及分布式環境。
Catalyst
sparkSQL內部實現流程中平台無關部分的基礎框架稱為Catalyst,它主要包括InternalRow體系、TreeNode體系和Expression體系。
InternalRow體系
spark SQL 內部實現中,InternalRow就是用來表示一行行數據的類,物理算子樹節點產生和轉換的RDD類型即為RDD[InternalRow] 。 InternalRow中的每一列都是Catalyst內部定義的數據類型。
InternalRow作為一個抽象類,包含numFields和update方法,以及各列數據對應的get與set方法,InternalRow中都是根據下表來訪問和操作列元素的。
其具體的實現包括BaseGenericInternalRow、UnsafeRow和JoinedRow3個直接子類

InternalRow體系
BaseGenericInternalRow:同樣是一個抽象類,實現了InternalRow中定義的所有get類型方法,這些方法的實現都通過調用類中定義的genericGet虛函數進行,該函數的實現在下一級子類中(也就是GenericInternalRow 、 SpecificInternalRow 、 MutbaleUnsafeInternalRow類中)
JoinedRow:該類主要用於Join操作,將兩個InternalRow放在一起形成新的InternalRow。使用時需要注意構造參數的順序。
UnsafeRow:不采用java對象存儲的方式,避免了JVM中垃圾回收(GC)的代價。此外UnsafeRow對行數據進行了特定的編碼,使得存儲更加高效。
BaseGenericInternalRow也有3個子類,分別是GenericInternalRow、SpecificInternalRow和 MutableUnsafeRow類。
其中MutableUnsafeRow和UnsafeRow相關,用來支持對特定的列數據進行修改。
GenericInternalRow類源碼
//構造參數是Array[Any]類型,采用對象數組進行底層存儲、
// 注意:數組是非拷貝的,因此一但創建,就不允許通過set操作進行改變。

1 class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow { 2 /** No-arg constructor for serialization. */ 3 protected def this() = this(null) 4 5 6 def this(size: Int) = this(new Array[Any](size)) 7 8 // 也是直接根據下表訪問的 9 override protected def genericGet(ordinal: Int) = values(ordinal) 10 11 12 override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values.clone() 13 14 15 override def numFields: Int = values.length 16 17 18 override def setNullAt(i: Int): Unit = { values(i) = null} 19 20 21 override def update(i: Int, value: Any): Unit = { values(i) = value } 22 }
而SpecificInternalRow則是以Array[MutableValue]為構造參數的,允許通過set操作進行修改。
final class SpecificInternalRow(val values:
Array[MutableValue]) extends BaseGenericInternalRow {
TreeNode體系
無論是邏輯計划還是物理計划,都離不開中間數據結構,在Catalyst中,對應的是TreeNode體系,TreeNode類是Sparksql中所有樹結構的基類,TreeNode內部包含一個Seq[BaseType]類型的變量children來表示節點,TreeNode定義了foreach、map、collect等針對節點操作方法,以及transformUp和transformDown等遍歷節點並對匹配節點進行相應轉換。
TreeNode一直在內存里維護,不會dump到磁盤以文件形式存儲,且無論在映射邏輯執行計划階段還是優化邏輯執行計划階段,樹的修改都是以替換已有節點的方式進行。

TreeNode體系

TreeNode基本操作
除上述操作外,Catalyst中還提供了節點位置功能,即能夠根據TreeNode定位到對應的SQL字符串中的行數和起始位置,該功能在SQL解析發生異常時能夠方便用戶迅速找到出錯的地方

1 // 在TreeNode類中 2 3 case class Origin( 4 line: Option[Int] = None, // 行號 5 startPosition: Option[Int] = None) // 偏移量 6 7 object CurrentOrigin { 8 private val value = new ThreadLocal[Origin]() { 9 override def initialValue: Origin = Origin() 10 } 11 12 13 def get: Origin = value.get() 14 def set(o: Origin): Unit = value.set(o) 15 16 17 def reset(): Unit = value.set(Origin()) 18 19 20 def setPosition(line: Int, start: Int): Unit = { 21 value.set( 22 value.get.copy(line = Some(line), startPosition = Some(start))) 23 } 24 25 26 def withOrigin[A](o: Origin)(f: => A): A = { 27 set(o) 28 val ret = try f finally { reset() } 29 ret 30 } 31 }