最近又開始捅咕上oozie了,所以回頭還是翻譯一下oozie的文檔。文檔里面最重要就屬這一章了——工作流定義。
一提到工作流,首先想到的應該是工作流都支持哪些工作依賴關系,比如串式的執行,或者一對多,或者多對一,或者條件判斷等等。Oozie在這方面支持的很好,它把節點分為控制節點和操作節點兩種類型,控制節點用於控制工作流的計算流程,操作節點用於封裝計算單元。本篇就主要描述下它的控制節點...
背景
先看看oozie工作流里面的幾個定義:
- action,一個action是一個獨立的任務,比如mapreduce,pig,shell,sqoop,spark或者java程序,它也可能是引用了某個action節點。
- workflow,它是一組action的集合,內部控制了節點間的依賴關系,DAG(Direct Acyclic Graph),一個action依賴另一個action,就意味着只有前一個action運行完成,才能繼續運行下一個。
- worklfow definition,是可執行的workflow的描述
- workflow definition language,定義了workflow的語言
- workflow jon,是一個workflow的實例
- workflow engine,用來執行workflow的系統
在oozie里面,工作流就是一組操作的集合,他們之前包含了前后依賴的關系,比如hadoop,pig等等。工作流里面可以包含fork和join的節點,用於把任務水平拆分成多個,並行執行,然后再合並到一起。
在oozie中,工作流的狀態可以是:
PREP RUNNING SUSPENDED SUCCEEDED KILLED FAILED
當任務失敗時,oozie會通過參數控制進行重試,或者直接退出。
工作流定義
一個工作流的定義包含了 流控制節點(比如start,end,decision,fork,join,kill)以及action節點(比如map-reduce,spark,sqoop,java,shell等),節點直接都是通過有向箭頭相連。
注意:在oozie里面是不支持環路的,工作流必須是嚴格的單向DAG。
工作流節點
工作流節點的命名規則需要滿足=[a-zA-Z][\-_a-zA-Z0-0]*=
,並且長度在20個字符以內。
流控制節點
流控制節點一般都是定義在工作流開始或者結束的位置,比如start,end,kill等。以及提供工作流的執行路徑機制,如decision,fork,join等。
start
start節點是工作流的入口,workflow第一個action就需要是start。當工作流啟動后,會自動尋找start節點執行。每個工作流都需要有一個start節點。
例如:
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<start to="firstHadoopJob"/>
...
</workflow-app>
end
end節點是工作流執行成功的最后一個節點,當到達end節點后,工作流的狀態會變成SUCCEEDED.如果有多個action指向了end,那么當第一個action執行后就會直接跳轉到end節點,雖然后面的action都沒有執行,但是workflow也認為是成功執行了。
例如:
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<end name="end"/>
</workflow-app>
kill
kill節點允許工作流自動停止,當工作流執行到kill時,工作流的狀態將會被認為是KILLED。如果有一個或者多個節點指向了kill,那么工作流都會被停止。一個workflow可以聲明零個或者多個節點。
其中name屬性是kill節點的名稱,message指定了工作流退出的原因。
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<kill name="killBecauseNoInput">
<message>Input unavailable</message>
</kill>
...
</workflow-app>
decision
decision節點支持給工作流提供選擇,有點類似switch-case的語法。它使用JSP表達式語法,來進行條件判斷。
比如:
<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
...
<decision name="mydecision">
<switch>
<case to="reconsolidatejob">
${fs:fileSize(secondjobOutputDir) gt 10 * GB}
</case> <case to="rexpandjob">
${fs:fileSize(secondjobOutputDir) lt 100 * MB}
</case>
<case to="recomputejob">
${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
</case>
<default to="end"/>
</switch>
</decision>
...
</workflow-app>
fork和join
fork節點把任務切分成多個並行任務,join則合並多個並行任務。fork和join節點必須是成對出現的。join節點合並的任務,必須是通一個fork出來的子任務才行。
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<fork name="forking">
<path start="firstparalleljob"/>
<path start="secondparalleljob"/>
</fork>
<action name="firstparallejob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<job-xml>job1.xml</job-xml>
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<action name="secondparalleljob">
<map-reduce>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<job-xml>job2.xml</job-xml>
</map-reduce>
<ok to="joining"/>
<error to="kill"/>
</action>
<join name="joining" to="nextaction"/>
...
</workflow-app>
在oozie里面,這種fork和join的機制是非常有用的,它可以把水平的任務並行執行,這樣能更有效的利用集群的資源,避免資源閑置浪費。
如果使用HUE圖形化界面的話,這些流控制節點基本上都是自動生成的,用戶可以不需要關注。但是為了能看懂實際的任務,最好還是了解一下他們的關系。