0、簡介
Presto 是 Facebook 推出的一個基於Java開發的大數據分布式 SQL 查詢引擎,可對從數 G 到數 P 的大數據進行交互式的查詢,查詢的速度達到商業數據倉庫的級別,據稱該引擎的性能是 Hive 的 10 倍以上。Presto 可以查詢包括 Hive、Cassandra 甚至是一些商業的數據存儲產品,單個 Presto 查詢可合並來自多個數據源的數據進行統一分析。Presto 的目標是在可期望的響應時間內返回查詢結果,Facebook 在內部多個數據存儲中使用 Presto 交互式查詢,包括 300PB 的數據倉庫,超過 1000 個 Facebook 員工每天在使用 Presto 運行超過 3 萬個查詢,每天掃描超過 1PB 的數據。
1、架構
Presto查詢引擎是一個Master-Slave的架構,由下面三部分組成:
- 一個Coordinator節點
- 負責解析SQL語句,生成執行計划,分發執行任務給Worker節點執行
- 一個Discovery Server節點
- 通常內嵌於Coordinator節點中
- 多個Worker節點
- 負責實際執行查詢任務,負責與HDFS交互讀取數據
Worker節點啟動后向Discovery Server服務注冊,Coordinator從Discovery Server獲得可以正常工作的Worker節點。如果配置了Hive Connector,需要配置一個Hive MetaStore服務為Presto提供Hive元信息。
更形象架構圖如下:
2、低延遲原理
- 完全基於內存的並行計算
- 流水線
- 本地化計算
- 動態編譯執行計划
- GC控制
3、執行過程
提交查詢
用戶使用Presto Cli提交一個查詢語句后,Cli使用HTTP協議與Coordinator通信,Coordinator收到查詢請求后調用SqlParser解析SQL語句得到Statement對象,並將Statement封裝成一個QueryStarter對象放入線程池中等待執行。
SQL編譯過程
Presto與Hive一樣,使用Antlr編寫SQL語法,語法規則定義在Statement.g和StatementBuilder.g兩個文件中。 如下圖中所示從SQL編譯為最終的物理執行計划大概分為5部,最終生成在每個Worker節點上運行的LocalExecutionPlan,這里不詳細介紹SQL解析為邏輯執行計划的過程,通過一個SQL語句來理解查詢計划生成之后的計算過程。
樣例SQL:
select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10;
上面的SQL語句生成的邏輯執行計划Plan如上圖所示。那么Presto是如何對上面的邏輯執行計划進行拆分以較高的並行度去執行完這個計划呢,我們來看看物理執行計划。
物理執行計划
邏輯執行計划圖中的虛線就是Presto對邏輯執行計划的切分點,邏輯計划Plan生成的SubPlan分為四個部分,每一個SubPlan都會提交到一個或者多個Worker節點上執行。
SubPlan有幾個重要的屬性planDistribution、outputPartitioning、partitionBy屬性。
- PlanDistribution表示一個查詢Stage的分發方式,邏輯執行計划圖中的4個SubPlan共有3種不同的PlanDistribution方式:Source表示這個SubPlan是數據源,Source類型的任務會按照數據源大小確定分配多少個節點進行執行;Fixed表示這個SubPlan會分配固定的節點數進行執行(Config配置中的query.initial-hash-partitions參數配置,默認是8);None表示這個SubPlan只分配到一個節點進行執行。在下面的執行計划中,SubPlan1和SubPlan0 PlanDistribution=Source,這兩個SubPlan都是提供數據源的節點,SubPlan1所有節點的讀取數據都會發向SubPlan0的每一個節點;SubPlan2分配8個節點執行最終的聚合操作;SubPlan3只負責輸出最后計算完成的數據。
- OutputPartitioning屬性只有兩個值HASH和NONE,表示這個SubPlan的輸出是否按照partitionBy的key值對數據進行Shuffle。在下面的執行計划中只有SubPlan0的OutputPartitioning=HASH,所以SubPlan2接收到的數據是按照rank字段Partition后的數據。
完全基於內存的並行計算
查詢的並行執行流程
Presto SQL的執行流程如下圖所示
- Cli通過HTTP協議提交SQL查詢之后,查詢請求封裝成一個SqlQueryExecution對象交給Coordinator的SqlQueryManager#queryExecutor線程池去執行
- 每個SqlQueryExecution線程(圖中Q-X線程)啟動后對查詢請求的SQL進行語法解析和優化並最終生成多個Stage的SqlStageExecution任務,每個SqlStageExecution任務仍然交給同樣的線程池去執行
- 每個SqlStageExecution線程(圖中S-X線程)啟動后每個Stage的任務按PlanDistribution屬性構造一個或者多個RemoteTask通過HTTP協議分配給遠端的Worker節點執行
- Worker節點接收到RemoteTask請求之后,啟動一個SqlTaskExecution線程(圖中T-X線程)將這個任務的每個Split包裝成一個PrioritizedSplitRunner任務(圖中SR-X)交給Worker節點的TaskExecutor#executor線程池去執行
上面的執行計划實際執行效果如下圖所示。
- Coordinator通過HTTP協議調用Worker節點的 /v1/task 接口將執行計划分配給所有Worker節點(圖中藍色箭頭)
- SubPlan1的每個節點讀取一個Split的數據並過濾后將數據分發給每個SubPlan0節點進行Join操作和Partial Aggr操作
- SubPlan1的每個節點計算完成后按GroupBy Key的Hash值將數據分發到不同的SubPlan2節點
- 所有SubPlan2節點計算完成后將數據分發到SubPlan3節點
- SubPlan3節點計算完成后通知Coordinator結束查詢,並將數據發送給Coordinator
源數據的並行讀取
在上面的執行計划中SubPlan1和SubPlan0都是Source節點,其實它們讀取HDFS文件數據的方式就是調用的HDFS InputSplit API,然后每個InputSplit分配一個Worker節點去執行,每個Worker節點分配的InputSplit數目上限是參數可配置的,Config中的query.max-pending-splits-per-node參數配置,默認是100。
分布式的Hash聚合
上面的執行計划在SubPlan0中會進行一次Partial的聚合計算,計算每個Worker節點讀取的部分數據的部分聚合結果,然后SubPlan0的輸出會按照group by字段的Hash值分配不同的計算節點,最后SubPlan3合並所有結果並輸出
流水線
數據模型
Presto中處理的最小數據單元是一個Page對象,Page對象的數據結構如下圖所示。一個Page對象包含多個Block對象,每個Block對象是一個字節數組,存儲一個字段的若干行。多個Block橫切的一行是真實的一行數據。一個Page最大1MB,最多16*1024行數據。
節點內部流水線計算
下圖是一個Worker節點內部的計算流程圖,左側是任務的執行流程圖。
Worker節點將最細粒度的任務封裝成一個PrioritizedSplitRunner對象,放入pending split優先級隊列中。每個
Worker節點啟動一定數目的線程進行計算,線程數task.shard.max-threads=availableProcessors() * 4,在config中配置。
每個空閑的線程從隊列中取出一個PrioritizedSplitRunner對象執行,如果執行完成一個周期,超過最大執行時間1秒鍾,判斷任務是否執行完成,如果完成,從allSplits隊列中刪除,如果沒有,則放回pendingSplits隊列中。
每個任務的執行流程如下圖右側,依次遍歷所有Operator,嘗試從上一個Operator取一個Page對象,如果取得的Page不為空,交給下一個Operator執行。
節點間流水線計算
下圖是ExchangeOperator的執行流程圖,ExchangeOperator為每一個Split啟動一個HttpPageBufferClient對象,主動向上一個Stage的Worker節點拉數據,數據的最小單位也是一個Page對象,取到數據后放入Pages隊列中
本地化計算
Presto在選擇Source任務計算節點的時候,對於每一個Split,按下面的策略選擇一些minCandidates
- 優先選擇與Split同一個Host的Worker節點
- 如果節點不夠優先選擇與Split同一個Rack的Worker節點
- 如果節點還不夠隨機選擇其他Rack的節點
對於所有Candidate節點,選擇assignedSplits最少的節點。
動態編譯執行計划
Presto會將執行計划中的ScanFilterAndProjectOperator和FilterAndProjectOperator動態編譯為Byte Code,並交給JIT去編譯為native代碼。Presto也使用了Google Guava提供的LoadingCache緩存生成的Byte Code。
上面的兩段代碼片段中,第一段為沒有動態編譯前的代碼,第二段代碼為動態編譯生成的Byte Code反編譯之后還原的優化代 碼,我們看到這里采用了循環展開的優化方法。
循環展開最常用來降低循環開銷,為具有多個功能單元的處理器提供指令級並行。也有利於指令流水線的調度。
引用
https://prestodb.io/overview.html