引言
目前數據平台使用Hadoop構建,為了方便數據分析師的工作,使用Hive對Hadoop MapReduce任務進行封裝,我們面對的不再是一個個的MR任務,而是一條條的SQL語句。數據平台內部通過類似JDBC的接口與HiveServer進行交互,僅僅能夠感知到一條SQL的開始與結束,而中間的這個過程通常是漫長的(兩個因素:數據量、SQL復雜度),某些場景下用戶需要了解這條SQL語句的執行進度,從而為我們引入以下幾個問題:
(1)通過JDBC接口執行一條SQL語句時,這條SQL語句被轉換成幾個MR任務,每個MR任務的JobId是多少,如何維護這條SQL語句與MR任務的對應關系?
(2)如何獲取MR任務的運行狀態,通過JobClient?
(3)通過HiveServer是否可以獲取到上述信息?
思路
當我們在終端下執行命令“hive”后,會看到有如下輸出:
Hive有會話(Session)的概念,而
這次會話中的所有日志消息將會輸出到這個日志文件中,包含SQL語句的執行日志,查看這個日志文件可以看到以下信息:
QueryStart行日志包含QUERY_STRING、QUERY_ID。
TaskStart行日志包含TASK_ID、QUERY_ID。
TaskProgress行日志包含TASK_HADOOP_PROGRESS、TASK_ID、QUERY_ID、TASK_HADOOP_ID,其中TASK_HADOOP_PROGRESS中可以獲取到map、reduce進度。
TaskEnd行日志包含TASK_HADOOP_PROGRESS、TASK_ID、QUERY_ID、TASK_HADOOP_ID。
QueryEnd行日志包含QUERY_STRING、QUERY_ID。
由上可知,QueryStart、TaskStart、TaskProgress、TaskEnd(一個復雜的Query可能會產生多個Task)、QueryEnd覆蓋整個查詢的執行過程,通過對這些行日志的解析,我們就可以獲取到Hive SQL的執行狀態。
此外,還有SessionStart、SessionEnd,由於使用過程中發現SessionEnd日志有時不被輸出,因此沒有使用這兩個狀態。
會話的日志文件存儲在HiveServer的本地磁盤中,而實際應用中我們有多台HiveServer提供服務,因此我們需要能夠統一收集所有HiveServer的會話日志。
通過對Hive源碼的分析發現,每次Hive執行語句時都會執行一些“Hook”(PreHook),代碼如下:
通過會話日志、PreHook,我們基本可以整理出以下思路:
在PreHook中啟動線程監聽會話日志的輸出(類型Linux的tailf),將這些日志信息統一收集到某一服務中,統一處理后做進度展示。
實現
我們構建了一個Rest API服務,一部分用於接收由PreHook發送的會話日志信息,另一部分用於對外提供進度展示。
PreHook要求實現接口ExecuteWithHookContext,如下:
通過hookContext我們可以獲取到以下信息:
QueryId:
QueryStr:
HadoopJobName:
Jobs:
HistFileName:
為了保證后續對會話日志的接收,我們需要在查詢執行伊始就將上述信息發送給Rest API服務,如下:
然后就是對會話日志的輸出監聽(即tailer),我們使用Apache Commons IO中的Tailer完成些功能,如下:
Tailer實際上啟動一個后台線程,並通過listener完成數據行的處理,而一次會話中可能執行多條查詢語句,而每一次執行查詢語句時都會導致PreHook的執行,因此我們需要避免同一會話中對histFileName多次“tailf”,需要維護已被“tailf”的文件,而且Tailer實例是需要被“stop”的,多數時候無法獲取到SessionEnd數據行,需要通過其它方式能夠終止會話已經消失的Tailer線程。為此專門設計了TailerTracker(單例,即TAILER_TRACKER)。
TailerTracker維護着一個記錄列表:
維護着成對的tailer與listener實例,其中listener實例中維護着對應tailer實例中最后一次新數據產生的時間,如果tailer實例在設定的時間內都沒有新數據產生,則應該對其執行stop,核心代碼如下:
判斷某一個會話文件是否已經被“tailer”,代碼如下:
標記一個會話文件已經被“tailer”,代碼如下:
會話日志數據行的輸出實際由FileTailerListener(繼承自TailerListenerAdapter)完成,代碼如下:
每處理一行數據,都要更新一下時間戳lastHandleTime,而QueryStart、QueryEnd、TaskStart、TaskProgress、TaskEnd的數據行會通過不同的Rest API Post。
至此,HiveServer的會話日志收集過程完畢,而Rest服務則需要通過這些收集到的數據完成Hive SQL進度跟蹤。
我們在通過JDBC接口與HiveServer交互時,是無法獲取到QueryId的,但是我們可以通過屬性mapred.job.name設置Hive SQL執行時的MR JobName,JobName代表查詢名稱,需要唯一,同時我們需要維護JobName與QueryId的對應關系。
在Rest服務內部設計實現ProgressController,用以維護JobName與QueryId的對應關系,同時使用QueryId跟蹤Hive SQL執行進度,核心變量如下:
目前Hive SQL的進度記錄僅僅在內存里維護(超過一定時間后,這些進度信息便不再有價值),因此需要控制內存中進度記錄的數量,這一點是通過記錄每一條SQL相關進度信息的最后更新時間(lastUpdateTime)來實現的,過期即被清除。
lastUpdateTime:維護JobName(即某個查詢)記錄最后更新時間;
jobNameToQueryId:維護JobName與QueryId的對應關系;
querys:維護QueryId與Hive SQL執行進度(QueryProgress)的對應關系。
QueryProgress內部結構如下:
queryId:查詢ID;
sql:查詢語句;
jobs:查詢被轉換成MapRecude Job的數量;
taskProgresses:維護TaskId與MapReduce的執行進度的對應關系;
startTime:查詢的起始時間;
stopTime:查詢的終止時間;
state:查詢狀態。
TaskProgress內部結構如下:
taskId:TaskId(Stage-1、Stage-2、...);
taskHadoopId:Task對應的Hadoop MapReduce Job Id;
map:Hadoop MapReduce map進度百分比值;
reduce:Hadoop MapReduce reduce進度百分比值;
startTime:Task起始時間;
stopTime:Task截止時間;
state:Task運行狀態。
當收到query/init的請求時,執行ProgressController queryInit方法,代碼如下:
當收到query/start的請求時,執行ProgressController queryStart方法,代碼如下:
當收到task/start的請求時,執行ProgressController taskStart方法,代碼如下:
當收到task/progress的請求時,執行ProgressController taskProgress方法,代碼如下:
當收到task/end的請求時,執行ProgressController taskEnd方法,代碼如下:
當收到query/end的請求時,執行ProgressController queryEnd方法,代碼如下:
其中ProgressController還承擔着定時清理的工作,代碼如下:
進度示例
不足
Hive SQL執行進度數據維護在內存中,而且Rest服務為單點。
