HIVE執行引擎TEZ學習以及實際使用


概述

 最近公司在使用Tez,今天寫一篇關於Tez的學習和使用隨筆。Tez是Apache最新的支持DAG作業的開源計算框架,它可以將多個有依賴的作業轉換為一個作業從而大幅提升DAG作業的性能。Tez並不直接面向最終用戶——事實上它允許開發者為最終用戶構建性能更快、擴展性更好的應用程序。Hadoop傳統上是一個大量數據批處理平台。但是,有很多用例需要近乎實時的查詢處理性能。還有一些工作則不太適合MapReduce,例如機器學習。Tez的目的就是幫助Hadoop處理這些用例場景。

Tez構建在YARN之上,后者是Hadoop所使用的新資源管理框架。Tez產生的主要原因是繞開MapReduce所施加的限制。除了必須要編寫Mapper和Reducer的限制之外,強制讓所有類型的計算都滿足這一范例還有效率低下的問題——例如使用HDFS存儲多個MR作業之間的臨時數據,這是一個負載。在Hive中,查詢需要對不相關的key進行多次shuffle操作的場景非常普遍,例如join - grp by - window function - order by。

Tez產生背景

MR性能差,資源消耗大,如:Hive作業之間的數據不是直接流動的,而是借助HDFS作為共享數據存儲系統,即一個作業將處理好的數據寫入HDFS,下一個作業再從HDFS重新讀取數據進行處理。很明顯更高效的方式是,第一個作業直接將數據傳遞給下游作業。

MR 默認了map和reduce階段,map會對中間結果進行分區、排序,reduce會進行合並排序,這一過程並不適用於所有場景。

引擎級別的Runtime優化:MR執行計划在編譯時已經確定,無法動態調整(?)。然而在執行ETL和Ad-hoc等任務時,根據實際處理的表大小,動態調整join策略、任務並行度將大大縮短任務執行時間。

下面給您展示一張Tez官方圖,您就可以簡單明白Tez和MapReduce的關系。

 

 

總的來說之前mapReduce在map和reduce階段都會產生I/O落盤,但是Tez就不要這一步驟了。目前hive使用了Tez(Hive是一個將用戶的SQL請求翻譯為MR任務,最終查詢HDFS的工具Tez采用了DAG(有向無環圖)來組織MR任務。 核心思想:將Map任務和Reduce任務進一步拆分,Map任務拆分為Input-Processor-Sort-Merge-Output,Reduce任務拆分為Input-Shuffer-Sort-Merge-Process-output,Tez將若干小任務靈活重組,形成一個大的DAG作業。 Tez與oozie不同:oozie只能以MR任務為整體來管理、組織,本質上仍然是多個MR任務的執行,不能解決上面提到的多個任務之間硬盤IO冗余的問題。 Tez只是一個Client,部署很方便。 目前Hive使用了Tez(Hive是一個將用戶的SQL請求翻譯為MR任務,最終查詢HDFS的工具)。

Tez原理

Tez包含的組件:

有向無環圖(DAG)——定義整體任務。一個DAG對象對應一個任務。

節點(Vertex)——定義用戶邏輯以及執行用戶邏輯所需的資源和環境。一個節點對應任務中的一個步驟。

邊(Edge)——定義生產者和消費者節點之間的連接。

邊需要分配屬性,對Tez而言這些屬性是必須的,有了它們才能在運行時將邏輯圖展開為能夠在集群上並行執行的物理任務集合。下面是一些這樣的屬性:

數據移動屬性,定義了數據如何從一個生產者移動到一個消費者。

調度(Scheduling)屬性(順序或者並行),幫助我們定義生產者和消費者任務之間應該在什么時候進行調度。

數據源屬性(持久的,可靠的或者暫時的),定義任務輸出內容的生命周期或者持久性,讓我們能夠決定何時終止。

該模型所有的輸入和輸出都是可插拔的。為了方便,Tez使用了一個基於事件的模型,目的是為了讓任務和系統之間、組件和組件之間能夠通信。事件用於將信息(例如任務失敗信息)傳遞給所需的組件,將輸出的數據流(例如生成的數據位置信息)傳送給輸入,以及在運行時對DAG執行計划做出改變等。Tez還提供了各種開箱即用的輸入和輸出處理器。這些富有表現力的API能夠讓更高級語言(例如Hive)的編寫者很優雅地將自己的查詢轉換成Tez任務。

TEZ技術:

Application Master Pool 初始化AM池。Tez先將作業提交到AMPoolServer服務上。AMPoolServer服務啟動時就申請多個AM,Tez提交作業會優先使用緩沖池資源

Container Pool AM啟動時會預先申請多個Container

Container重用

TEZ實現方法:

Tez對外提供了6種可編程組件,分別是:

Input:對輸入數據源的抽象,它解析輸入數據格式,並吐出一個個Key/value

Output:對輸出數據源的抽象,它將用戶程序產生的Key/value寫入文件系統

Paritioner:對數據進行分片,類似於MR中的Partitioner

Processor:對計算的抽象,它從一個Input中獲取數據,經處理后,通過Output輸出

Task:對任務的抽象,每個Task由一個Input、Ouput和Processor組成

Maser :管理各個Task的依賴關系,並按順依賴關系執行他們

除了以上6種組件,Tez還提供了兩種算子,分別是Sort(排序)和Shuffle(混洗),為了用戶使用方便,它還提供了多種Input、Output、Task和Sort的實現

DAG

Edge:定義了上下游Vertex之間的連接方式。

Edge相關屬性:

Data movement:定義了producer與consumer之間數據流動的方式。
One-To-One: 第i個producer產生的數據,發送給第i個consumer。這種上下游關系屬於Spark的窄依賴。
Broadcast: producer產生的數據路由都下游所有consumer。這種上下游關系也屬於Spark的窄依賴。
Scatter-Gather: producer將產生的數據分塊,將第i塊數據發送到第i個consumer。這種上下游關系屬於Spark的寬依賴。

Scheduling:定義了何時啟動consumer Task
Sequential: Consumer task 需要producer task結束后啟動,如:MR。
Concurrent: Consumer task 與producer task一起啟動,如:流計算。

Data source:定義了任務outp的生命周期與可靠性。
Persisted: 當任務退出后,該任務output依然存在,但經過一段時間后,可能會被刪除,如:Mapper輸出的中間結果。
Persisted-Reliable: 任務output總是存在,比如,MR中reducer的輸出結果,存在HDFS上。
Ephemeral: 任務輸出只有當該task在運行的時候,才存在,如:流計算的中間結果。

舉例——MapReduce在Tez的編程模型
一個DAG圖中只有兩個Vertex,Map Vertex與Reduce Vertex。連接Map Vertex與Reduce Vertex的Edge有以下屬性: Data movement: Scatter-Gather Scheduling:Sequential Data Source: Map Vertex的Data Source為Persisted-Reliable, reduce Vertex 的Data Source為Persisted

Tez Api實現WordCount

Runtime API——Input/Processor/Output

Task是Tez的最小執行單元,Vertex中task的數量與該vertex的並行度一致。以下是Input、Processor、Output均需要實現的接口:

List<Event> initialize(Tez*Context) -This is where I/P/O receive their corresponding context objects. They can, optionally, return a list of events.

handleEvents(List<Event> events) – Any events generated for the specific I/P/O will be passed in via this interface. Inputs receive DataMovementEvent(s) generated by corresponding Outputs on this interface – and will need to interpret them to retrieve data. At the moment, this can be ignored for Outputs and Processors.

List<Event> close() – Any cleanup or final commits will typically be implemented in the close method. This is generally a good place for Outputs to generate DataMovementEvent(s). More on these events later.

Input: 接收上游Output事件,獲取上游數據位置;從physical Edge中獲取實際數據;解析實際數據,為Processor提供統一的邏輯試圖;

Processor: 利用Input獲取實際數據,執行用戶邏輯,最后輸出;

Output: 將Processor提供的數據,進行分區;向下游Input發送事件;

Tez的事件驅動機制: Tez中各個組件通過不同類型的Event進行通信。

數據傳輸:Output通過ShuffleEvent傳遞上游數據位置,AM負責將Event路由到相應Input中。

容錯:Input當無法獲取到上游數據時,會通知框架重新調度上游任務,這也意味着任務成功完成后,仍然會被重新調度。

runtime執行計划優化:根據上游Map Stage產生的數據大小,動態reducer並行度。Output產生的事件路由到可拔插的Vertex/Edge management module,對應moudule就可以對runtime執行計划進行調整。

Runtime優化

任務運行時,程序知曉更多任務相關的信息,通過這些信息,我們可以動態修改修改執行計划,比如:修改mapper或reducer數量,決定何時啟動reducer等。在Tez中,不同組件通過不同事件類型,進行通信。

動態修改reducer並行度:MapTask通過VertexManager類型的事件向ShuffleVertextManager發送信息,比如:所處理的partition大小等。 ShuffleVertexManager通過所獲得的信息,可以估算出所有Task的輸出數據大小,最后來調整下游reduce Vertex的並行度,如下圖:

 

reducer"慢"啟動(預先啟動): 上游MapTask通過事件不斷向ShuffleVertexManager匯報任務完成情況,ShuffleVertexManager通過這些信息,可以判斷何時啟動下游reduceTask與需要啟動的reduceTask數量。

從邏輯執行計划到物理執行計划

從邏輯DAG到最后物理執行計划示意圖:

其他優化措施

Tez Session: 與數據庫session相似,在同一個Tez Session中,可串行執行多個Tez Dag。Tez Session避免了AM的多次啟動與銷毀,在有多個DAG圖的Tez作業(HQL任務)中大大減小了任務執行時間。

這也是為什么在Tez-UI中,一個HQL任務,只有一個Application,卻有多個DAG(MR中一個HQL任務,有多個Application)。

Tez相關參數:

 

Container復用

問題:

container的資源兼容? 被先后調度到同一個container的多個task所需要的資源,必須與container的資源相互兼容。也就是說,container擁有的資源,如:jar包,Memory,CPU等,需要是task所需資源的“超集”。

怎么調度? 進行container復用時,Tez對Task進行調度。Tez會依據:任務本地性、任務所需資源、pending任務的優先級等因素,進行任務調度。

優點:

減少作業執行過程中JVM的創建與銷毀帶來的開銷

減小對RM的請求壓力

運行在同一container上task之間的數據共享。比如,MapJoin中可以通過共享小表數據的方式,減少資源消耗。

相關參數:

Tez優缺點

優點:

避免中間數據寫回HDFS,減小任務執行時間

vertex management模塊使runtime動態修改執行計划變成可能

input/processor/output編程模型,大大提高了任務模型的靈活性

提供container復用機制與Tez Session,減少資源消耗

缺點:

出現數據重復問題等數據質量問題

Tez與Hive捆綁,在其他領域應用較少

社區不活躍

Tez安裝過程

1.下載tez src解壓,修改pom.xml, 將hadoop.version改為2.7.2,最好用非root用戶編譯
mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true
Github-refer:https://github.com/apache/tez
Download-refer:http://tez.apache.org/
2.編譯成功后,在tez-dist/target目錄下,能夠發現如下文件
archive-tmp maven-archiver tez-0.8.4 tez-0.8.4-minimal tez-0.8.4-minimal.tar.gz tez-0.8.4.tar.gz tez-dist-0.8.4-tests.jar
3.將tez-0.8.4-minimal上傳到hdfs上,本例中上傳到/tez目錄下 Hadoop fs –put tez-0.8.4-minimal.tar.gz /tez/
4, 將tez-0.8.4-minimal.tar.gz考到/usr/hive下,並解壓到./tez下
taz –zxvf tez-0.8.4-minimal.tar.gz –C ./tez
拷貝tez的lib下或者主目錄下jar包===》到hive主目錄下lib中去
5.在客戶端安裝tez-0.8.4-minimal,並且在conf目錄下建立tez-site.xml並正確配置
 <property>
   <name>tez.lib.uris</name>
   <value>${fs.defaultFS}/user/tez/tez-0.8.5-minimal.tar.gz</value>
 </property>
<property>
  <name>tez.use.cluster.hadoop-libs</name>
  <value>true</value>
</property>

 6.在hive的客戶端配置環境變量 (可以配置到/etc/profile.d/Hadoop.sh

export TEZ_HOME=/usr/hive/tez

for jar in `ls ${TEZ_HOME}|grep jar`;do
  export HADOOP_CLASSPATH=${TEZ_HOME}/$jar:${HADOOP_CLASSPATH}
done
for jar in `ls ${TEZ_HOME}/lib/|grep jar`;do
 export HADOOP_CLASSPATH=${TEZ_HOME}/lib/$jar:${HADOOP_CLASSPATH}
done

7.在hive-site.xml中配置參數。

<property>
   <name>hive.user.install.directory</name>
   <value>/user/</value>
   <description> If hive (in tez mode only) cannot find a usable hive jar in "hive.jar.directory", it will upload the hive jar to "hive.user.install.directory/user.name" and use it to run queries. </description>
 </property>
 <property>
   <name>hive.execution.engine</name>
   <value>tez</value>
   <description> Expects one of [mr, tez, spark]. Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR remains the default engine for historical reasons, it is itself a historical engine and is deprecated in Hive 2 line. It may be removed without further warning. </description>
 </property>

8.hive啟動后執行。

set hive.execution.engine=tez;  #即可運行hive on tez任務。

補充:hive2.x默認計算引擎為tez,編輯/usr/hive/conf/hive-site.xml

Tez實際使用

配置tez到hive中,安裝過程省略。

准備數據:

CREATE TABLE user_match_temp (
user_name string,
opponent string,
result int,
create_time timestamp)
row format delimited fields terminated by ','
stored as textfile
location '/project/rhett/user_match_temp/';

 執行查詢:

SELECT *,lag(opponent,1) over (partition by user_name order by create_time) as lag_opponent, lead(opponent,1) over   (partition by user_name order by create_time) as lead_opponent, first_value(opponent) over (partition by user_name order by create_time rows    between 3 preceding and 3 following) as first_opponent,  last_value(opponent) over (partition by user_name order by create_time rows   between 3 preceding and 3 following) as last_opponent From user_match_temp;

在tez界面可以查看對應的任務所有的DAG都在這里:

 在上圖可以查看到整個任務的執行調度情況。上圖顯示了執行結果,執行用戶,執行語句,執行在yarn的applicationID還有隊列。這里dag id也是惟一的id,點擊dag name就會顯示DAG的整個過程:

上圖顯示了 DAG的概述界面,運行進展,包含了2個Vertex,其中map vertex執行7個任務,reducer vertex執行2個任務,並且都是成功的,最下面是執行的完整sql語句。在最上面有一個下載的按鈕,這個下載之后是一個zip的壓縮包,壓縮包主要是5個json文件,包含運行的任務的情況,dag的情況,task_attempts情況,tasks情況,vertex情況。

執行的DAG圖形:

還可以查看所有的tez的所有任務:

 

總結

感謝大神分享:

https://zhuanlan.zhihu.com/p/63315907

https://blog.csdn.net/hqwang4/article/details/78090087

https://cloud.tencent.com/developer/article/1625679

https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works

https://www.jianshu.com/p/167ba0464a44


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM