facebook Presto SQL分析引擎——本質上和spark無異,分解stage,task,MR計算


Presto 是由 Facebook 開源的大數據分布式 SQL 查詢引擎,適用於交互式分析查詢,可支持眾多的數據源,包括 HDFS,RDBMS,KAFKA 等,而且提供了非常友好的接口開發數據源連接器。

介紹

Presto是一個運行在多台服務器上的分布式系統。 完整安裝包括一個coordinator和多個worker。 由客戶端提交查詢,從Presto命令行CLI提交到coordinator。 coordinator進行解析,分析並執行查詢計划,然后分發處理隊列到worker。

Presto Installation Overview

完全基於內存的並行計算

查詢的並行執行流程

Presto SQL的執行流程如下圖所示

  1. Cli通過HTTP協議提交SQL查詢之后,查詢請求封裝成一個SqlQueryExecution對象交給Coordinator的SqlQueryManager#queryExecutor線程池去執行
  2. 每個SqlQueryExecution線程(圖中Q-X線程)啟動后對查詢請求的SQL進行語法解析和優化並最終生成多個Stage的SqlStageExecution任務,每個SqlStageExecution任務仍然交給同樣的線程池去執行
  3. 每個SqlStageExecution線程(圖中S-X線程)啟動后每個Stage的任務按PlanDistribution屬性構造一個或者多個RemoteTask通過HTTP協議分配給遠端的Worker節點執行
  4. Worker節點接收到RemoteTask請求之后,啟動一個SqlTaskExecution線程(圖中T-X線程)將這個任務的每個Split包裝成一個PrioritizedSplitRunner任務(圖中SR-X)交給Worker節點的TaskExecutor#executor線程池去執行

 查詢執行流程

上面的執行計划實際執行效果如下圖所示。

  1. Coordinator通過HTTP協議調用Worker節點的 /v1/task 接口將執行計划分配給所有Worker節點(圖中藍色箭頭)
  2. SubPlan1的每個節點讀取一個Split的數據並過濾后將數據分發給每個SubPlan0節點進行Join操作和Partial Aggr操作
  3. SubPlan1的每個節點計算完成后按GroupBy Key的Hash值將數據分發到不同的SubPlan2節點
  4. 所有SubPlan2節點計算完成后將數據分發到SubPlan3節點
  5. SubPlan3節點計算完成后通知Coordinator結束查詢,並將數據發送給Coordinator

 執行計划計算流程

源數據的並行讀取

在上面的執行計划中SubPlan1和SubPlan0都是Source節點,其實它們讀取HDFS文件數據的方式就是調用的HDFS InputSplit API,然后每個InputSplit分配一個Worker節點去執行,每個Worker節點分配的InputSplit數目上限是參數可配置的,Config中的query.max-pending-splits-per-node參數配置,默認是100。

分布式的Hash聚合

上面的執行計划在SubPlan0中會進行一次Partial的聚合計算,計算每個Worker節點讀取的部分數據的部分聚合結果,然后SubPlan0的輸出會按照group by字段的Hash值分配不同的計算節點,最后SubPlan3合並所有結果並輸出

流水線

數據模型

Presto中處理的最小數據單元是一個Page對象,Page對象的數據結構如下圖所示。一個Page對象包含多個Block對象,每個Block對象是一個字節數組,存儲一個字段的若干行。多個Block橫切的一行是真實的一行數據。一個Page最大1MB,最多16*1024行數據。

 數據模型

節點內部流水線計算

下圖是一個Worker節點內部的計算流程圖,左側是任務的執行流程圖。

Worker節點將最細粒度的任務封裝成一個PrioritizedSplitRunner對象,放入pending split優先級隊列中。每個

Worker節點啟動一定數目的線程進行計算,線程數task.shard.max-threads=availableProcessors() * 4,在config中配置。

每個空閑的線程從隊列中取出一個PrioritizedSplitRunner對象執行,如果執行完成一個周期,超過最大執行時間1秒鍾,判斷任務是否執行完成,如果完成,從allSplits隊列中刪除,如果沒有,則放回pendingSplits隊列中。

每個任務的執行流程如下圖右側,依次遍歷所有Operator,嘗試從上一個Operator取一個Page對象,如果取得的Page不為空,交給下一個Operator執行。

 節點內部流水線計算

節點間流水線計算

下圖是ExchangeOperator的執行流程圖,ExchangeOperator為每一個Split啟動一個HttpPageBufferClient對象,主動向上一個Stage的Worker節點拉數據,數據的最小單位也是一個Page對象,取到數據后放入Pages隊列中

 節點間流水線計算

本地化計算

Presto在選擇Source任務計算節點的時候,對於每一個Split,按下面的策略選擇一些minCandidates

  1. 優先選擇與Split同一個Host的Worker節點
  2. 如果節點不夠優先選擇與Split同一個Rack的Worker節點
  3. 如果節點還不夠隨機選擇其他Rack的節點

對於所有Candidate節點,選擇assignedSplits最少的節點。

動態編譯執行計划

Presto會將執行計划中的ScanFilterAndProjectOperator和FilterAndProjectOperator動態編譯為Byte Code,並交給JIT去編譯為native代碼。Presto也使用了Google Guava提供的LoadingCache緩存生成的Byte Code。

 動態編譯執行計划

 動態編譯執行計划

上面的兩段代碼片段中,第一段為沒有動態編譯前的代碼,第二段代碼為動態編譯生成的Byte Code反編譯之后還原的優化代
碼,我們看到這里采用了循環展開的優化方法。

循環展開最常用來降低循環開銷,為具有多個功能單元的處理器提供指令級並行。也有利於指令流水線的調度。

小心使用內存和數據結構

使用Slice進行內存操作,Slice使用Unsafe#copyMemory實現了高效的內存拷貝,Slice倉庫參考:https://github.com/airlift/slice

Facebook工程師在另一篇介紹ORCFile優化的文章中也提到使用Slice將ORCFile的寫性能提高了20%~30%,參考:https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/

類BlinkDB的近似查詢

為了加快avg、count distinct、percentile等聚合函數的查詢速度,Presto團隊與BlinkDB作者之一Sameer Agarwal合作引入了一些近似查詢函數approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法實現。

GC控制

Presto團隊在使用hotspot java7時發現了一個JIT的BUG,當代碼緩存快要達到上限時,JIT可能會停止工作,從而無法將使用頻率高的代碼動態編譯為native代碼。

Presto團隊使用了一個比較Hack的方法去解決這個問題,增加一個線程在代碼緩存達到70%以上時進行顯式GC,使得已經加載的Class從perm中移除,避免JIT無法正常工作的BUG。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM