oozie支持使用EL(expression language)表達式。
基本的EL常量
- KB
- MB
- GB
- TB
- PB
基本EL函數
string firstNotNull(String value1,String value2)
返回第一個不為空的值,如果都為null,則返回null
string concat(String s1,String s2)
拼接兩個字符串,如果一個為null,拼接的字符串為空
string replaceAll(String src,String regex,String replacement)
替換正則表達式匹配的位置。如果regex為null,則什么也不做。如果replacement為null,則替換為空串
string appendAll(String src,String append,String delimeter)
把append字符串添加到切分后的字符串中。比如appendAll("a,b,c","123",",")
將會返回a123,b123,c123
。append為null代表返回空串,delimiter為null,代表什么也不做。
string trim(String s)
給指定的字符串去除空格
String urlEncode(String s)
URL解碼
String timestamp()
返回當前的時間戳,並格式化為yyyy-MM-ddTHH:mmZ
,到分鍾粒度。
String toJsonStr(Map)
把Map返回成json,這在獲取前一個action的輸出內容時比較有用。比如wf:actionData(String actionName)格式化為json
String toPropertiesStr(Map)
把Map返回成Java Properties
String toConfigurationStr(Map)
把Map返回成Configuration
工作流EL函數
String wf:id()
獲取當前工作流節點的id
String wf:name()
獲取當前工作流的名稱
String wf:appPath()
獲取當前工作流workflow.xml所在的目錄
String wf:conf(String name)
返回當前工作流的屬性值
String wf:user()
返回啟動當前工作流的用戶
String wf:group()
返回當前工作流的組
String wf:callback(String stateVar)
返回當前工作流的回調,stateVar可以指定成某個狀態,也可以傳一個參數可以在遠程進行替換
String wf:transition(String node)
返回工作流的狀態
String wf:lastErrorNode()
返回當前工作流退出的狀態
String wf:errorCode(String node)
返回特定node的錯誤代碼
String wf:errorMessage(String message)
返回出錯的主要信息
int wf:run()
返回當前工作流任務的標志,0代表正常
Map wf:actionData(String node)
返回指定節點輸出的內容,需要配合<capture-output>
標簽使用
int wf:actionExternalId(String node)
返回節點的外部id
int wf:actionTrakerUri(String node)
返回當前節點的uri
int wf:actionExternalStatus(String node)
返回指定節點的外部狀態
Hadoop EL常量
- RECORDS
- MAP_IN
- MAP_OUT
- REDUCE_IN
- REDUCE_OUT
- GROUPS
Hadoop 任務EL函數的例子
<workflow-app xmlns="uri:oozie:workflow:0.2" name="pig-wf">
<start to="pig-node"/>
<action name="pig-node">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/pig"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<script>id.pig</script>
<param>INPUT=/user/${wf:user()}/${examplesRoot}/input-data/text</param>
<param>OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/pig</param>
</pig>
<ok to="java1"/>
<error to="fail"/>
</action>
<action name="java1">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>MyTest</main-class>
<arg> ${wf:actionData("pig-node")["hadoopJobs"]}</arg>
<capture-output/>
</java>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
hdfs el方法
boolean fs:exists(String path)
判斷指定的URI是否存在
boolean fs:isDir(String path)
判斷是否是目錄
long fs:dirSize(String path)
返回指定目錄下的所有文件的大小。如果不是目錄,返回-1。它不支持嵌套,只能返回下面一層的文件大小
long fs:fileSize(String path)
返回指定文件的大小,如果不是文件,返回-1
long fs:blockSize(String path)
返回指定文件占用的block大小。如果不是file,返回-1