Presto 是由 Facebook 開源的大數據分布式 SQL 查詢引擎,適用於交互式分析查詢,可支持眾多的數據源,包括 HDFS,RDBMS,KAFKA 等,而且提供了非常友好的接口開發數據源連接器。
介紹
Presto是一個運行在多台服務器上的分布式系統。 完整安裝包括一個coordinator和多個worker。 由客戶端提交查詢,從Presto命令行CLI提交到coordinator。 coordinator進行解析,分析並執行查詢計划,然后分發處理隊列到worker。
完全基於內存的並行計算
查詢的並行執行流程
Presto SQL的執行流程如下圖所示
- Cli通過HTTP協議提交SQL查詢之后,查詢請求封裝成一個SqlQueryExecution對象交給Coordinator的SqlQueryManager#queryExecutor線程池去執行
- 每個SqlQueryExecution線程(圖中Q-X線程)啟動后對查詢請求的SQL進行語法解析和優化並最終生成多個Stage的SqlStageExecution任務,每個SqlStageExecution任務仍然交給同樣的線程池去執行
- 每個SqlStageExecution線程(圖中S-X線程)啟動后每個Stage的任務按PlanDistribution屬性構造一個或者多個RemoteTask通過HTTP協議分配給遠端的Worker節點執行
- Worker節點接收到RemoteTask請求之后,啟動一個SqlTaskExecution線程(圖中T-X線程)將這個任務的每個Split包裝成一個PrioritizedSplitRunner任務(圖中SR-X)交給Worker節點的TaskExecutor#executor線程池去執行
上面的執行計划實際執行效果如下圖所示。
- Coordinator通過HTTP協議調用Worker節點的 /v1/task 接口將執行計划分配給所有Worker節點(圖中藍色箭頭)
- SubPlan1的每個節點讀取一個Split的數據並過濾后將數據分發給每個SubPlan0節點進行Join操作和Partial Aggr操作
- SubPlan1的每個節點計算完成后按GroupBy Key的Hash值將數據分發到不同的SubPlan2節點
- 所有SubPlan2節點計算完成后將數據分發到SubPlan3節點
- SubPlan3節點計算完成后通知Coordinator結束查詢,並將數據發送給Coordinator
源數據的並行讀取
在上面的執行計划中SubPlan1和SubPlan0都是Source節點,其實它們讀取HDFS文件數據的方式就是調用的HDFS InputSplit API,然后每個InputSplit分配一個Worker節點去執行,每個Worker節點分配的InputSplit數目上限是參數可配置的,Config中的query.max-pending-splits-per-node參數配置,默認是100。
分布式的Hash聚合
上面的執行計划在SubPlan0中會進行一次Partial的聚合計算,計算每個Worker節點讀取的部分數據的部分聚合結果,然后SubPlan0的輸出會按照group by字段的Hash值分配不同的計算節點,最后SubPlan3合並所有結果並輸出
流水線
數據模型
Presto中處理的最小數據單元是一個Page對象,Page對象的數據結構如下圖所示。一個Page對象包含多個Block對象,每個Block對象是一個字節數組,存儲一個字段的若干行。多個Block橫切的一行是真實的一行數據。一個Page最大1MB,最多16*1024行數據。
節點內部流水線計算
下圖是一個Worker節點內部的計算流程圖,左側是任務的執行流程圖。
Worker節點將最細粒度的任務封裝成一個PrioritizedSplitRunner對象,放入pending split優先級隊列中。每個
Worker節點啟動一定數目的線程進行計算,線程數task.shard.max-threads=availableProcessors() * 4,在config中配置。
每個空閑的線程從隊列中取出一個PrioritizedSplitRunner對象執行,如果執行完成一個周期,超過最大執行時間1秒鍾,判斷任務是否執行完成,如果完成,從allSplits隊列中刪除,如果沒有,則放回pendingSplits隊列中。
每個任務的執行流程如下圖右側,依次遍歷所有Operator,嘗試從上一個Operator取一個Page對象,如果取得的Page不為空,交給下一個Operator執行。
節點間流水線計算
下圖是ExchangeOperator的執行流程圖,ExchangeOperator為每一個Split啟動一個HttpPageBufferClient對象,主動向上一個Stage的Worker節點拉數據,數據的最小單位也是一個Page對象,取到數據后放入Pages隊列中
本地化計算
Presto在選擇Source任務計算節點的時候,對於每一個Split,按下面的策略選擇一些minCandidates
- 優先選擇與Split同一個Host的Worker節點
- 如果節點不夠優先選擇與Split同一個Rack的Worker節點
- 如果節點還不夠隨機選擇其他Rack的節點
對於所有Candidate節點,選擇assignedSplits最少的節點。
動態編譯執行計划
Presto會將執行計划中的ScanFilterAndProjectOperator和FilterAndProjectOperator動態編譯為Byte Code,並交給JIT去編譯為native代碼。Presto也使用了Google Guava提供的LoadingCache緩存生成的Byte Code。
上面的兩段代碼片段中,第一段為沒有動態編譯前的代碼,第二段代碼為動態編譯生成的Byte Code反編譯之后還原的優化代
碼,我們看到這里采用了循環展開的優化方法。
循環展開最常用來降低循環開銷,為具有多個功能單元的處理器提供指令級並行。也有利於指令流水線的調度。
小心使用內存和數據結構
使用Slice進行內存操作,Slice使用Unsafe#copyMemory實現了高效的內存拷貝,Slice倉庫參考:https://github.com/airlift/slice
Facebook工程師在另一篇介紹ORCFile優化的文章中也提到使用Slice將ORCFile的寫性能提高了20%~30%,參考:https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/
類BlinkDB的近似查詢
為了加快avg、count distinct、percentile等聚合函數的查詢速度,Presto團隊與BlinkDB作者之一Sameer Agarwal合作引入了一些近似查詢函數approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法實現。
GC控制
Presto團隊在使用hotspot java7時發現了一個JIT的BUG,當代碼緩存快要達到上限時,JIT可能會停止工作,從而無法將使用頻率高的代碼動態編譯為native代碼。
Presto團隊使用了一個比較Hack的方法去解決這個問題,增加一個線程在代碼緩存達到70%以上時進行顯式GC,使得已經加載的Class從perm中移除,避免JIT無法正常工作的BUG。