Oozie支持Java Action,因此可以自定義很多的功能。本篇就從理論和實踐兩方面介紹下Java Action的妙用,另外還涉及到oozie中action之間的參數傳遞。
本文大致分為以下幾個部分:
- Java Action教程文檔
- 自定義Java Action實踐
- 從源碼的角度講解Java Action與Shell Action的參數傳遞。
如果你即將或者想要使用oozie,那么本篇的文章將會為你提供很多參考的價值。
Java Action文檔
java action
會自動執行提供的java class
的public static void main
方法, 並且會在hadoop集群啟動一個單獨的map-reduce的map任務來執行的。因此,如果你自定義了一個java程序,它會提交到集群的某一個節點執行,不會每個節點都執行一遍。
workflow任務會等待java程序執行完繼續執行下一個action。當java類正確執行退出后,將會進入ok控制流;當發生異常時,將會進入error控制流。Java程序絕對不能使用System.exit(int n)
將會導致action進入error控制流。
在action的配置中,也支持EL表達式。並且使用<capture-output>
也可以把數據輸出出來,然后后面的action就可以基於EL表達式使用了。
語法規則
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
...
<action name="[NODE-NAME]">
<java>
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[JOB-XML]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<main-class>[MAIN-CLASS]</main-class>
<java-opts>[JAVA-STARTUP-OPTS]</java-opts>
<arg>ARGUMENT</arg>
...
<file>[FILE-PATH]</file>
...
<archive>[FILE-PATH]</archive>
...
<capture-output />
</java>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
prepare
元素,支持創建或者刪除指定的文件內容。在delete
時,支持通配的方式指定特定的路徑。java-opts
以及java-opt
參數提供了執行java應用時分配的JVM。
舉個例子:
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstjavajob">
<java>
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>org.apache.oozie.MyFirstMainClass</main-class>
<java-opts>-Dblah</java-opts>
<arg>argument1</arg>
<arg>argument2</arg>
</java>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
覆蓋Main方法
oozie中的很多action都支持這個功能,在configure
中指定classpath下的一個類方法,它會覆蓋當前action的main方法。這在不想重新編譯jar包,而想替換程序時,非常有用。
自定義Java action程序以及部署
Java程序可以任意定義,比如寫一個最簡單的hellword,然后打包成lib。
然后需要定義oozie腳本:
<action name="java-7cbb">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>a.b.c.Main</main-class>
<arg>arg1</arg>
<arg>arg2</arg>
<file>/oozie/lib/ojdbc7.jar#ojdbc7.jar</file>
<capture-output/>
</java>
<ok to="end"/>
<error to="Kill"/>
</action>
其中幾個比較重要的屬性,千萬不能拉下:
- 1 需要指定Map-reduce的隊列:
mapred.job.queue.name
- 2 指定Main class
<main-class>
- 3 如果依賴其他的jar,需要添加
<file>
- 4 如果想要捕獲輸出,需要設置
<capture-output>
如果使用HUE圖形化配置,就比較簡單了:
點擊右上角的齒輪,配置其他的屬性信息:
基於源碼分析參數傳遞
先從表象來說一下shell action如何傳遞參數:
你只需要定義一個普通的shell,在里面使用echo把屬性輸出出來即可,后面的action自動就可以基於EL表達式使用。
test='test123'
echo "test=$test"
這樣后面的action就可以直接使用了:
${wf:actionData('action-name').test}或者${wf:actionData('action-name')['test']}
很簡單是吧!
在Java里面就沒這么容易了:
無論是 System.out.println() 還是 logger.info/error,都無法捕獲到數據
從中抄了一段代碼:
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
...
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();
props.setProperty(propKey0, propVal0);
props.setProperty(propKey1, propVal1);
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined");
果然就好用了....
為了理解其中的緣由,我們來看看代碼。首先在shell action中發現一句話:
<<< Invocation of Main class completed <<<
Oozie Launcher, capturing output data:
=======================
於是全局搜索,果然找到對應的代碼,在org.apache.oozie.action.hadoop.LuancherMapper.java
中,line275開始:
if (errorMessage == null) {
handleActionData();
if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
System.out.println();
System.out.println("Oozie Launcher, capturing output data:");
System.out.println("=======================");
System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
System.out.println();
System.out.println("=======================");
System.out.println();
}
。。。
}
這里的actionData其實就是個普通的MAP
private Map<String,String> actionData;
public LauncherMapper() {
actionData = new HashMap<String,String>();
}
Map里面保存了很多屬性值,其中就包括我們想要捕獲的輸出內容:
static final String ACTION_PREFIX = "oozie.action.";
static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
...
String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
if (outputProp != null) {
File actionOutputData = new File(outputProp);
if (actionOutputData.exists()) {
int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
actionData.put(ACTION_DATA_OUTPUT_PROPS,
getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
}
}
....
public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException {
StringBuffer sb = new StringBuffer();
FileReader reader = new FileReader(file);
char[] buffer = new char[2048];
int read;
int count = 0;
while ((read = reader.read(buffer)) > -1) {
count += read;
if (maxLen > -1 && count > maxLen) {
throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]");
}
sb.append(buffer, 0, read);
}
reader.close();
return sb.toString();
}
可以看到其實就是從oozie.action.output.properties指定的目錄里面去讀內容,然后輸出出來,后面的action就可以用了。這就是為什么上面抄的那段代碼可以使用的原因。
那么問題是,shell為什么直接echo就行,java里面卻要這么費勁?
別急,先來看看java action的啟動邏輯:
public static void main(String[] args) throws Exception {
run(JavaMain.class, args);
}
@Override
protected void run(String[] args) throws Exception {
...
Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
...
Method mainMethod = klass.getMethod("main", String[].class);
try {
mainMethod.invoke(null, (Object) args);
} catch(InvocationTargetException ex) {
// Get rid of the InvocationTargetException and wrap the Throwable
throw new JavaMainException(ex.getCause());
}
}
它什么也沒做,就是啟動了目標類的main方法而已。
再來看看shell:
private int execute(Configuration actionConf) throws Exception {
...
//判斷是否要捕獲輸出
boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);
//執行命令
Process p = builder.start();
//處理進程
Thread[] thrArray = handleShellOutput(p, captureOutput);
...
return exitValue;
}
protected Thread[] handleShellOutput(Process p, boolean captureOutput)
throws IOException {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
// 捕獲標准輸出
OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
thrStdout.setDaemon(true);
thrStdout.start();
OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
thrStderr.setDaemon(true);
thrStderr.start();
return new Thread[]{ thrStdout, thrStderr };
}
class OutputWriteThread extends Thread {
...
@Override
public void run() {
String line;
BufferedWriter os = null;
//讀取數據保存在目標文件中
try {
if (needCaptured) {
File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
os = new BufferedWriter(new FileWriter(file));
}
while ((line = reader.readLine()) != null) {
if (isStdout) { // For stdout
// 1. Writing to LM STDOUT
System.out.println("Stdoutput " + line);
// 2. Writing for capture output
if (os != null) {
if (Shell.WINDOWS) {
line = line.replace("\\u", "\\\\u");
}
os.write(line);
os.newLine();
}
}
else {
System.err.println(line); // 1. Writing to LM STDERR
}
}
}
catch (IOException e) {
...
}finally {
...
}
}
}
這樣就很清晰了,shell自動幫我們把輸出的內容寫入了oozie.action.output.properties文件中。而在java中則需要用戶自己來定義寫入的過程。