環境
虛擬機:VMware 10
Linux版本:CentOS-6.5-x86_64
客戶端:Xshell4
FTP:Xftp4
jdk8
CM5.4
同類產品:Azkaban
一、簡介
Oozie由Cloudera公司貢獻給Apache的基於工作流引擎的開源框架,是用於Hadoop平台的開源的工作流調度引擎,是用來管理Hadoop作業,屬於web應用程序,由Oozie client和Oozie Server兩個組件構成,Oozie Server運行於Java Servlet容器(Tomcat)中的web程序。
特點:
(1)實際上Oozie不是僅用來配置多個MR工作流的,它可以是各種程序夾雜在一起的工作流,比如執行一個MR1后,接着執行一個java腳本,再執行一個shell腳本,接着是Hive腳本,然后又是Pig腳本,最后又執行了一個MR2,使用Oozie可以輕松完成這種多樣的工作流。使用Oozie時,若前一個任務執行失敗,后一個任務將不會被調度。
(2)Oozie的工作流必須是一個有向無環圖,實際上Oozie就相當於Hadoop的一個客戶端,當用戶需要執行多個關聯的MR任務時,只需要將MR執行順序寫入workflow.xml,然后使用Oozie提交本次任務,Oozie會托管此任務流。
(3)Oozie定義了控制流節點(Control Flow Nodes)和動作節點(Action Nodes),其中控制流節點定義了流程的開始和結束,以及控制流程的執行路徑(Execution Path),如decision,fork,join等;而動作節點包括Haoop map-reduce hadoop文件系統,Pig,SSH,HTTP,eMail和Oozie子流程
架構:
- workflow:工作流,由我們需要處理的每個工作組成,進行需求的流式處理。
- coordinator: 協調器,可將多個工作流協調成一個工作流來進行處理:多個workflow可以組成一個coordinator,可以把前幾個workflow的輸出作為后 一個workflow的輸入,也可以定義workflow的觸發條件,來做定時觸發
- bundle:捆,束,將一堆的coordinator進行匯總處理,是對一堆coordinator的抽象
二、安裝配置
通過CM安裝Oozie服務或者手動安裝
1、Oozie WEB控制台失效問題
解壓ext-2.2到/var/lib/oozie unzip ext-2.2.lib -d /var/lib/oozie
Oozie服務中配置啟用web控制台
保存,重啟Oozie服務
Oozie配置
1、節點內存配置
2、oozie.service.callablequeueservice.callable.concurrency(節點並發)
3、oozie.service.callablequeueservice.queue.size(隊列大小)
4、oozie.service.ActionService.executor.ext.classes(擴展)
Oozie共享庫
–/user/oozie/share/lib
web管理地址
oozie自帶 WEBUI
http://oozie_host_ip:11000/oozie/
Hue UI:
三、客戶端常用命令
Oozie CLI 命令
#啟動任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –run #提交任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –submit #開始任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –start 0000003-150713234209387-oozie-oozi-W #停止任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -kill 0000002-150713234209387-oozie-oozi-W #查看任務執行情況: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –info 0000003-150713234209387-oozie-oozi-W
注意:啟動任務其實包含:提交任務和開始任務,兩個命令合成一個。
四、Oozie任務配置
1、Hue操作 workflows
參考:
Hue中使用Oozie的workflow執行MR過程
通過hue提交oozie定時任務
2、通過配置文件使用
2.1兩個重要的配置文件:
job.properties
2.2workflow.xml
(1)版本信息
–<workflow-app xmlns="uri:oozie:workflow:0.4" name=“workflow name">
(2)EL函數
– 基本的EL函數
•String firstNotNull(String value1, String value2)
•String concat(String s1, String s2)
•String replaceAll(String src, String regex, String replacement)
•String appendAll(String src, String append, String delimeter)
•String trim(String s)
•String urlEncode(String s)
•String timestamp()
•String toJsonStr(Map) (since Oozie 3.3)
•String toPropertiesStr(Map) (since Oozie 3.3)
•String toConfigurationStr(Map) (since Oozie 3.3)
WorkFlow EL
•String wf:id() – 返回當前workflow作業ID
•String wf:name() – 返回當前workflow作業NAME
•String wf:appPath() – 返回當前workflow的路徑
•String wf:conf(String name) – 獲取當前workflow的完整配置信息
•String wf:user() – 返回啟動當前job的用戶
•String wf:callback(String stateVar) – 返回結點的回調URL,其中參數為動作指定的退出狀態
•int wf:run() – 返回workflow的運行編號,正常狀態為0
•Map wf:actionData(String node) – 返回當前節點完成時輸出的信息
•int wf:actionExternalStatus(String node) – 返回當前節點的狀態
•String wf:lastErrorNode() – 返回最后一個ERROR狀態推出的節點名稱
•String wf:errorCode(String node) – 返回指定節點執行job的錯誤碼,沒有則返回空
•String wf:errorMessage(String message) – 返回執行節點執行job的錯誤信息,沒有則返回空
– HDFS EL
•boolean fs:exists(String path)
•boolean fs:isDir(String path)
•long fs:dirSize(String path) – 目錄則返回目錄下所有文件字節數;否則返回-1
•long fs:fileSize(String path) – 文件則返回文件字節數;否則返回-1
•long fs:blockSize(String path) – 文件則返回文件塊的字節數;否則返回-1
(3)節點
– A、流程控制節點
•start – 定義workflow開始
•end – 定義workflow結束
•decision – 實現switch功能
•sub-workflow – 調用子workflow
•kill – 殺死workflow
•fork – 並發執行workflow
•join – 並發執行結束(與fork一起使用)
– B、動作節點
•shell
•java
•fs
•MR
•hive
•sqoop
<decision name="[NODE-NAME]">
<switch>
<case to="[NODE_NAME]">[PREDICATE]</case>
...
<case to="[NODE_NAME]">[PREDICATE]</case>
<default to="[NODE_NAME]" />
</switch>
</decision>
<fork name="[FORK-NODE-NAME]">
<path start="[NODE-NAME]" />
...
<path start="[NODE-NAME]" />
</fork>
...
<join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
五、示例
1、Oozie shell
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/workflow/oozie/shell
注意:job.properties文件可以不上傳到hdfs中,是在執行oozie job ...... -config時,批定的linux本地路徑
(2)編寫workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.4" name="shell-wf"> <start to="shell-node"/> <action name="shell-node"> <shell xmlns="uri:oozie:shell-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <exec>echo</exec> <argument>my_output=Hello Oozie</argument> <capture-output/> </shell> <ok to="check-output"/> <error to="fail"/> </action> <decision name="check-output"> <switch> <case to="end"> ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'} </case> <default to="fail-output"/> </switch> </decision> <kill name="fail"> <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <kill name="fail-output"> <message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message> </kill> <end name="end"/> </workflow-app>
文件上傳到HDFS路徑:hdfs://master:8020/user/workflow/oozie/shell 或者直接在Hue文件瀏覽器下創建和編輯workflow.xml
(3)CLI 執行啟動任務命令,返回一個job ID
在UI里查看:
點擊查看詳情:
查看Job DAG
2、Oozie fs
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/fs/workflow.xml
(2)編寫workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="fs"> <start to="fs-node"/> <action name="fs-node"> <fs> <delete path='/home/kongc/oozie'/> <mkdir path='/home/kongc/oozie1'/> <move source='/home/kongc/spark-application' target='/home/kongc/oozie1'/> </fs> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
3、Oozie Sqoop
(1)編寫job.properties
nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
examplesRoot=examples
#指定oozie使用系統的共享目錄
oozie.use.system.libpath=true
#指定workflow.xml所在目錄
oozie.wf.application.path=${nameNode}/user/examples/apps/sqoop
#編寫配置文件
#HSQL Database Engine 1.8.0.5 #Tue Oct 05 11:20:19 SGT 2010 hsqldb.script_format=0 runtime.gc_interval=0 sql.enforce_strict_size=false hsqldb.cache_size_scale=8 readonly=false hsqldb.nio_data_file=true hsqldb.cache_scale=14 version=1.8.0 hsqldb.default_table_type=memory hsqldb.cache_file_scale=1 hsqldb.log_size=200 modified=no hsqldb.cache_version=1.7.0 hsqldb.original_version=1.8.0 hsqldb.compatible_version=1.8.0
#編寫SQL
CREATE SCHEMA PUBLIC AUTHORIZATION DBA CREATE MEMORY TABLE TT(I INTEGER NOT NULL PRIMARY KEY,S VARCHAR(256)) CREATE USER SA PASSWORD "" GRANT DBA TO SA SET WRITE_DELAY 10 SET SCHEMA PUBLIC INSERT INTO TT VALUES(1,'a') INSERT INTO TT VALUES(2,'a') INSERT INTO TT VALUES(3,'a')
(2)編寫workflow.xml
<?xml version="1.0" encoding="UTF-8"?> <workflow-app xmlns="uri:oozie:workflow:0.2" name="sqoop-wf"> <start to="sqoop-node"/> <action name="sqoop-node"> <sqoop xmlns="uri:oozie:sqoop-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/sqoop"/> <mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <command>import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir /user/oozie/${examplesRoot}/output-data/sqoop -m 1</command> <file>db.hsqldb.properties#db.hsqldb.properties</file> <file>db.hsqldb.script#db.hsqldb.script</file> </sqoop> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
4、Oozie Java
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/java-main
(2)編寫workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-kc"> <start to="java-node"/> <action name="java-node"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <main-class>org.apache.oozie.example.DemoJavaMain</main-class> <arg>Hello</arg> <arg>Oozie!</arg> </java> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
5、Oozie Hive
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/hive
(2)編寫workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf"> <start to="hive2-node"/> <action name="hive2-node"> <hive2 xmlns="uri:oozie:hive2-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/hive2"/> <mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <jdbc-url>${jdbcURL}</jdbc-url> <script>script.q</script> <param>INPUT=/user/oozie/${examplesRoot}/input-data/table</param> <param>OUTPUT=/user/oozie/${examplesRoot}/output-data/hive2</param> </hive2> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
編寫hive腳本
INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT * FROM test_machine;
6、Oozie Impala
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/impala EXEC=impala.sh
(2)編寫workflow.xml
<workflow-app name="shell-impala" xmlns="uri:oozie:workflow:0.4"> <start to="shell-impala-invalidate"/> <action name="shell-impala-invalidate"> <shell xmlns="uri:oozie:shell-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <exec>${EXEC}</exec> <file>${EXEC}#${EXEC}</file> </shell> <ok to="end"/> <error to="kill"/> </action> <kill name="kill"> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
(3)impala.sh
#!/bin/bash impala-shell -i slave2:21000 -q "select count(*) from test_machine" echo 'Hello Shell'
7、ozie MapReduce
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/map-reduce/workflow.xml outputDir=map-reduce
(2)編寫workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wyl"> <start to="mr-node"/> <action name="mr-node"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/${outputDir}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.class</name> <value>org.apache.oozie.example.SampleMapper</value> </property> <property> <name>mapred.reducer.class</name> <value>org.apache.oozie.example.SampleReducer</value> </property> <property> <name>mapred.map.tasks</name> <value>1</value> </property> <property> <name>mapred.input.dir</name> <value>/user/oozie/${examplesRoot}/input-data/text</value> </property> <property> <name>mapred.output.dir</name> <value>/user/oozie/${examplesRoot}/output-data/${outputDir}</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
8、Oozie Spark
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/spark
(2)編寫workflow.xml
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'> <start to='spark-node' /> <action name='spark-node'> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/spark"/> </prepare> <master>${master}</master> <name>Spark-FileCopy</name> <class>org.apache.oozie.example.SparkFileCopy</class> <jar>${nameNode}/user/oozie/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar> <arg>${nameNode}/user/oozie/${examplesRoot}/input-data/text/data.txt</arg> <arg>${nameNode}/user/oozie/${examplesRoot}/output-data/spark</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name='end' /> </workflow-app>
9、Oozie 定時任務
(1)編寫job.properties
nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/aggregator/coordinator.xml start=2019-01-01T01:00Z end=2019-01-01T03:00Z
(2)編寫coordinator.xml
<coordinator-app name="aggregator-coord" frequency="${coord:hours(1)}" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2"> <controls> <concurrency>1</concurrency> </controls> <datasets> <dataset name="raw-logs" frequency="${coord:minutes(20)}" initial-instance="2010-01-01T00:00Z" timezone="UTC"> <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template> </dataset> <dataset name="aggregated-logs" frequency="${coord:hours(1)}" initial-instance="2010-01-01T01:00Z" timezone="UTC"> <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/output-data/aggregator/aggregatedLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> </dataset> </datasets> <input-events> <data-in name="input" dataset="raw-logs"> <start-instance>${coord:current(-2)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> <data-out name="output" dataset="aggregated-logs"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>${nameNode}/user/${coord:user()}/${examplesRoot}/apps/aggregator</app-path> <configuration> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>queueName</name> <value>${queueName}</value> </property> <property> <name>inputData</name> <value>${coord:dataIn('input')}</value> </property> <property> <name>outputData</name> <value>${coord:dataOut('output')}</value> </property> </configuration> </workflow> </action> </coordinator-app>
注意事項:
- job.properties文件可以不上傳到hdfs中,是在執行oozie job ...... -config時,批定的linux本地路徑
- workflow.xml文件,一定要上傳到job.properties的oozie.wf.application.path對應的hdfs目錄下。
- job.properties中的oozie.use.system.libpath=true指定oozie使用系統的共享目錄。
- job.properties中的oozie.libpath=${nameNode}/user/${user.name}/apps/mymr,可以用來執行mr時,作業導出的jar包存放位置,否則可能報找不到類的錯誤。
- oozie調度作業時,本質也是啟動一個mapreduce作業來調度,workflow.xml中設置的隊列名稱為調度作業mr的隊列名稱。所以如果想讓作業運行在指定的隊列時,需要在mr或hive中指定好。