1. Oozie簡介
Yahoo開發工作流引擎Oozie(馭象者),用於管理Hadoop任務(支持MapReduce、Spark、Pig、Hive),把這些任務以DAG(有向無環圖)方式串接起來。Oozie任務流包括:coordinator、workflow;workflow描述任務執行順序的DAG,而coordinator則用於定時任務觸發,相當於workflow的定時管理器,其觸發條件包括兩類:
- 數據文件生成
- 時間條件
Oozie定義了一種基於XML的hPDL (Hadoop Process Definition Language)來描述workflow的DAG。在workflow中定義了
- 控制流節點(Control Flow Nodes)
- 動作節點(Action Nodes)
其中,控制流節點定義了流程的開始和結束(start、end),以及控制流程的執行路徑(Execution Path),如decision、fork、join等;而動作節點包括Hadoop任務、SSH、HTTP、eMail和Oozie子流程等。控制流節點示例如下:
<workflow-app xmlns='uri:oozie:workflow:0.2' name="ooziedemo-wf">
<start to="timeCheck"/>
...
<kill name="fail">
<message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name="end"/>
</workflow-app>
<!-- or -->
<workflow-app xmlns='uri:oozie:workflow:0.2' name="ooziedemo-wf">
<start ../>
<fork name="forking">
<path start="sqoopMerge1"/>
<path start="sqoopMerge2"/>
</fork>
<join name="joining" to="end"/>
<end ../>
</workflow-app>
其中,fork、join是成對出現,表示了工作流的並發執行,最后匯聚到一個node。從Oozie的工作流調度機制可以看出,Oozie沒有能力表達復雜的DAG,比如:嵌套的依賴關系。此外,Oozie工作流可以參數化,比如:在工作流定義中使用像${inputDir}之類的變量,然后通過job.properties配置對應參數,在啟動時將這些配置參數傳入工作流:
oozie job -oozie http://<host>:11000/oozie/ -config job.properties -run
2. Workflow
Action Node定義了基本的工作任務節點。(以下介紹版本基於Oozie 4.1.0)
MapReduce
一般地,我用java action啟動MapReduce任務,對於任務的動態變化參數,在workflow的configuration進行配置,然后在job.properties指定參數值。
<action name="Data Clean">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.reduce.tasks</name>
<value>${reducerNum}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>...</main-class>
<java-opts>-Xms256m -Xmx512m</java-opts>
<arg>..</arg>
<arg>${nameNode}/user/${wf:user()}/xx</arg>
...
<arg>${cleanDate}</arg>
<capture-output />
</java>
<ok to="end" />
<error to="fail" />
</action>
其中, ${wf:user()}
為workflow的內置參數,表示當前用戶名。一般地,使用該參數,為了保證寫權限(畢竟沒有寫文件到其他用戶文件夾的權限)。
Spark
Oozie支持Spark action,不過支持的不是特別好。提交spark任務時,需要加載spark-assembly jar。
<action name="Spark Data Clean">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>etl${cleanDate}</name>
<class>...</class>
<jar>/<hdfs>/<path>/lib/xxx.jar</jar>
<spark-opts>
--num-executors ${executors} --driver-memory 4g --executor-memory 4g --executor-cores 5 --queue=${queueName}
</spark-opts>
<arg>..</arg>
</spark>
<ok to="end" />
<error to="fail" />
</action>
Pig
Oozie內置pig action,其中<script>
為pig腳本所在的HDFS路徑,param
為pig腳本中的參數。Oozie調度pig任務略坑,先隨機指定一台機器,然后將pig腳本dist到該機器,然后執行。但是,因為集群中不同機器部署的pig版本可能不一致,而導致任務跑失敗。
<action name="Pig Data Clean">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
</configuration>
<script>/<hdfs>/<path>/data-clean.pig</script>
<param>CLEANDATE=${cleanDate}</param>
</pig>
<ok to="end"/>
<error to="fail"/>
</action>
在pig腳本中,一般用$
+ 大寫字母表示輸入參數,示例如下:
A = load '/<hdfs>/<path>/$CLEANDATE' using OrcStorage();
...
E = ...
store E into '/<path>/$CLEANDATE';
實際上,在本地執行帶參數的pig腳本時,也是用-param命令:
pig -f test.pig -param CLEANDATE=2016-05-26
Hive
Oozie也可以調度Hive任務,一般使用hive2 action通過beeline連接Hive Server 2,然后執行HiveQL:
<action name="Hive2">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
</configuration>
<jdbc-url>jdbc:hive2://host:10000/db-name</jdbc-url>
<script>${NameNode}/<hdfs>/<path>/test.hql</script>
<param>DAYTIME=${dayTime}</param>
</hive2>
<ok to="end"/>
<error to="fail"/>
</action>
其中,param為HiveQL中的輸入參數,其對應hql為
alter table db.log_tb
add if not exists partition (day_time=date '${DAYTIME}')
location '${DAYTIME}';
hive命令執行本地hql通過--hivevar
傳入參數:
hive -f test.hql --hivevar DAYTIME=2016-05-17
此外,在執行hive2 action時需有如下依賴:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-beeline</artifactId>
<version>${hive.version}</version>
</dependency>
在job.properties指定oozie.libpath
(對應於依賴jar的目錄)。