參數傳遞是調度字體工作流運行時非常重要的一部分,工作流的執行,單個作業的執行,多個工作流之間的依賴執行,歷史任務重算,都涉及到參數傳遞和同步。
1 參數類型綜述
azkaban的工作流中的參數可以分為如下幾個類型:
- Azkaban UI 頁面輸入參數
- 環境變量參數
- job作業文件中定義的參數
- 工作流的用戶定義的屬性文件,上游作業傳遞給下游的參數
- 工作流運行時產生的系統參數
- job的common參數
參數類型與其對應的參數范圍如下:
參數類型 | 作用域 |
UI 頁面輸入參數 ,即工作流參數 | flow全局有效 |
工作流ZIP壓縮包中的屬性文件(.properties結尾) | flow全局有效,zip文件目錄以及子目錄有效 |
工作流運行時參數 | flow全局有效 |
環境變量參數 | flow全局有效 |
job的common參數 | job內局部有效 |
JOB文件中定義的參數 | job內局部有效 |
上游作業傳遞給下游的參數 | job內局部有效 |
2. job 參數簡介commom參數
除了type,command,dependencies三個參數外,還有如下一些保留參數可以為每個job配置
參數 | 說明 |
retries | 失敗的job的自動重試的次數 |
retry.backoff | 重試的間隔(毫秒) |
working.dir | 指定命令被調用的目錄。默認的working目錄是executions/${execution_ID}目錄 |
env.property | 指定在命令執行前需設置的環境變量。Property定義環境變量的名稱,因此 env.VAR_NAME=VALUE就創建了一個$VAR_NAME環境變量,並且指定了它的VALUE |
failure.emails | job失敗時發送的郵箱,用逗號隔開 |
success.emails | job成功時發送的郵箱,用逗號隔開 |
notify.emails | job成功或失敗都發送的郵箱,用逗號隔開 |
一個flow的email屬性,只會取最后一個job的配置,其他的job的email配置將會被忽略。
3. job之間的參數傳遞
先看官網的描述:
Parameter Passing
There is often a desire to pass these parameters to the executing job code. The method of passing these parameters is dependent on the jobtype that is run, but usually Azkaban writes these parameters to a temporary file that is readable by the job.
The path of the file is set in JOB_PROP_FILE environment variable. The format is the same key value pair property files. Certain built-in job types do this automatically for you. The java type, for instance, will invoke your Runnable and given a proper constructor, Azkaban can pass parameters to your code automatically.
Parameter Output
Properties can be exported to be passed to its dependencies. A second environment variable JOB_OUTPUT_PROP_FILE is set by Azkaban. If a job writes a file to that path, Azkaban will read this file and then pass the output to the next jobs in the flow.
The output file should be in json format. Certain built-in job types can handle this automatically, such as the java type.
意思是:JOB_OUTPUT_PROP_FILE和JOB_PROP_FILE都是一個環境變量,指向文件路徑。
參數傳入:
上游節點把需要輸出的值以json的格式寫入JOB_OUTPUT_PROP_FILE文件,azkaban以job執行過程中,上游job傳遞進來的臨時參數,運行時參數,項目中配置文件的參數,job定義中參數等 都保存在 ${JOB_PROP_FILE}文件中,保存格式為key=value。執行job的中shell命令時,可以作為參數傳遞。
參數傳出:
一個azkaban job執行結束,可以將一些參數寫入到${JOB_OUTPUT_PROP_FILE}文件 中,azkaban會將這些參數傳遞到下游依賴的的job的參數文件${JOB_PROP_FILE}文件中,供下游job引用。寫入到${JOB_OUTPUT_PROP_FILE}文件中參數需要是json格式的,否則會報json解析錯。下游節點就可以在JOB_PROP_FILE中看到key-value形式的輸出,用${key}的方式使用變量。
舉例:
baseflow.flow
#baseflow.flow
nodes:
- name: jobB
type: command
dependsOn:
- jobA
config:
command: sh commandB.sh "${firstName}"
- name: jobA
type: command
config:
command: sh commandA.sh
commandA.sh
#!/bin/bash echo '{ "firstName":"John" , "lastName":"Doe" }' >> ${JOB_OUTPUT_PROP_FILE}
commandB.sh
#!/bin/bash cat ${JOB_PROP_FILE} >> /root/azkaban.txt echo $1 >> /root/azkaban.txt
jobB依賴JobA,jobA執行完成后,會一串json內容到${JOB_OUTPUT_PROP_FILE}指向的文件中,JobA執行完成后,jobB才可以執行,等job執行時,會將jobA輸出的內容寫入到/root/azkaban.txt,並追加參數中的firstName寫入到文件中,注意第一個參數只能通過shell調用的方式來傳遞。
4 job參數之runtime屬性
runtime屬性是在job運行期間自動被添加的
參數 |
說明 |
azkaban.job.attempt |
job重試次數,從0開始增加 |
azkaban.job.id |
運行的job name |
azkaban.flow.flowid |
運行的job的flow name |
azkaban.flow.execid |
flow的執行id |
azkaban.flow.projectid |
工程id |
azkaban.flow.projectversion |
project上傳的版本 |
azkaban.flow.uuid |
flow uuid |
azkaban.flow.start.timestamp |
flow start的時間戳 |
azkaban.flow.start.year |
flow start的年份 |
azkaban.flow.start.month |
flow start 的月份 |
azkaban.flow.start.day |
flow start 的天 |
azkaban.flow.start.hour |
flow start的小時 |
azkaban.flow.start.minute |
start 分鍾 |
azkaban.flow.start.second |
start 秒 |
azkaban.flow.start.millseconds |
start的毫秒 |
azkaban.flow.start.timezone |
start 的時區 |
5 job參數之參數繼承
后綴為.properties的文件將會作為參數文件加載,並且為flow中每個job所共享,屬性文件通過目錄分層結構繼承。
比如,在zip包中有以下結構
system.properties baz.job myflow/myflow.properties myflow/myflow2.properties myflow/foo.job myflow/bar.job
system.properties是全局的屬性,將會被baz.job和myflow目錄下的foo.job和bar.job使用,但是baz.job不會繼承myflow.properties和myflow2.properties的屬性,因為是它的下層.
6 job參數之參數替換
azkaban支持參數替換;替換參數樣式: azkaban會替換{}中的參數。無論${parameterName} 在job file中或者在參數文件中或者運行時參數發現,都可以被替換為對應的值。
shared.properties
# shared.properties
replaceparameter=bar
myjob.job
# myjob.job param1=mytest foo=${replaceparameter} #${replaceparameter}會替換為bar param2=${param1} # ${param1} 會被替換成mytest。
前面這個例子,在myjob 作業運行前,foo 會被賦值為bar , param2會被賦值為mytest.
注意:參數名不能有空格,標點符號等。
7 shell動態傳參
azkaban中的shell 作業,如何接收從webUI傳遞的參數?
7.1 UI頁面輸入參數定義
ui_test=test111111111
7.2 在job文件myjob.job指定
##作業定義文件UI輸入參數接收: job_param4=${ui_test} ##作業定義文件腳本命令行引用UI輸入參數: sh test_azkaban_job.sh "${job_param4}"
7.3 shell test_azkaban_job.sh 的內容
vim test_azkaban_job.sh
echo "inputparamter:$1" #接收job文件中傳遞的參數。
FAQ1:在頁面手動執行前面的job時,如果UI參數ui_test在job執行沒有輸入,會執行失敗。異常信息如下:
hello ERROR - Failed to build job executor for job hello Could not find variable substitution for variable(s) [param4->ui_test ]
在定時調度任務指定時,需要指定工作流參數flowParameters :ui_test,避免該錯誤。
7.4 shell中使用參數的注意事項
在UI頁面重新輸入運行時參數時,可以覆蓋系統默認生成的參數值。運行時參數,和UI輸入的參數,都可以認為是全局參數,在整個工作流的作業配置中,都可以通過 ${參數名} 的方式引用使用。
- 在shell 中直接引用 公共參數,運行時系統參數,UI輸入參數,是無效的。
- 在shell中只能直接使用環境變量;
- 公共參數,運行時系統參數,UI輸入參數能只通過shell的腳本參數的方式傳遞進來。
- job文件中定義的環境變量參數,可以在shell腳本中直接引用,但只對當前job有效。
8 reference
1. https://www.cnblogs.com/chenmingjun/p/10506488.html