第一章、Azkaban 概論
1.1 為什么需要工作流調度系統
1)一個完整的數據分析系統通常都是由大量任務單元組成:
Shell 腳本程序,Java 程序,MapReduce 程序、Hive 腳本等
2)各任務單元之間存在時間先后及前后依賴關系
3)為了很好地組織起這樣的復雜執行計划,需要一個工作流調度系統來調度執行;
1.2 常見工作流調度系統
1)簡單的任務調度:直接使用 Linux 的 Crontab 來定義;
2)復雜的任務調度:開發調度平台或使用現成的開源調度系統,比如 Ooize、Azkaban、 Airflow、DolphinScheduler 等。
1.3 Azkaban 與 Oozie 對比
總體來說,Ooize 相比 Azkaban 是一個重量級的任務調度系統,功能全面,但配置使用
也更復雜。如果可以不在意某些功能的缺失,輕量級調度器 Azkaban 是很不錯的候選對象。
第二章、Azkaban 入門
2.1 集群模式安裝
上傳 tar 包
1)將 azkaban-db-3.84.4.tar.gz,azkaban-exec-server-3.84.4.tar.gz,azkaban-webserver-3.84.4.tar.gz 上傳到 hadoop102 的/opt/software 路徑
[atguigu@hadoop102 software]$ ll
總用量 35572
-rw-r--r--. 1 atguigu atguigu 6433 4 月 18 17:24 azkaban-db-
3.84.4.tar.gz
-rw-r--r--. 1 atguigu atguigu 16175002 4 月 18 17:26 azkaban-execserver-3.84.4.tar.gz
-rw-r--r--. 1 atguigu atguigu 20239974 4 月 18 17:26 azkaban-webserver-3.84.4.tar.gz
2)新建/opt/module/azkaban 目錄,並將所有 tar 包解壓到這個目錄下
[atguigu@hadoop102 software]$ mkdir /opt/module/azkaban
3)解壓 azkaban-db-3.84.4.tar.gz、 azkaban-exec-server-3.84.4.tar.gz 和 azkabanweb-server-3.84.4.tar.gz 到/opt/module/azkaban 目錄下
[atguigu@hadoop102 software]$ tar -zxvf azkaban-db-3.84.4.tar.gz -C /opt/module/azkaban/
[atguigu@hadoop102 software]$ tar -zxvf azkaban-exec-server-3.84.4.tar.gz -C /opt/module/azkaban/
[atguigu@hadoop102 software]$ tar -zxvf azkaban-web-server-3.84.4.tar.gz -C /opt/module/azkaban/
4)進入到/opt/module/azkaban 目錄,依次修改名稱
[atguigu@hadoop102 azkaban]$ mv azkaban-exec-server-3.84.4/ azkaban-exec
[atguigu@hadoop102 azkaban]$ mv azkaban-web-server-3.84.4/ azkaban-web
配置 MySQL
1)正常安裝 MySQL
2)啟動 MySQL
[atguigu@hadoop102 azkaban]$ mysql -uroot -p000000
3)登陸 MySQL,創建 Azkaban 數據庫
mysql> create database azkaban;
4)創建 azkaban 用戶並賦予權限
設置密碼有效長度 4 位及以上
mysql> set global validate_password_length=4;
設置密碼策略最低級別
mysql> set global validate_password_policy=0;
創建 Azkaban 用戶,任何主機都可以訪問 Azkaban,密碼是 000000
mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY '000000';
賦予 Azkaban 用戶增刪改查權限
mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;
5)創建 Azkaban 表,完成后退出 MySQL
mysql> use azkaban;
mysql> source /opt/module/azkaban/azkaban-db-3.84.4/create-allsql-3.84.4.sql
mysql> quit;
6)更改 MySQL 包大小;
防止 Azkaban 連接 MySQL 阻塞
[atguigu@hadoop102 software]$ sudo vim /etc/my.cnf
在[mysqld]下面加一行 max_allowed_packet=1024M
[mysqld]
max_allowed_packet=1024M
8)重啟 MySQL
[atguigu@hadoop102 software]$ sudo systemctl restart mysqld
配置 Executor Server
Azkaban Executor Server 處理工作流和作業的實際執行。
1)編輯 azkaban.properties
[atguigu@hadoop102 azkaban]$ vim /opt/module/azkaban/azkabanexec/conf/azkaban.properties
修改如下標紅的屬性
#...
default.timezone.id=Asia/Shanghai
#...
azkaban.webserver.url=http://hadoop102:8081
executor.port=12321
#...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
2)同步 azkaban-exec 到所有節點
[atguigu@hadoop102 azkaban]$ xsync /opt/module/azkaban/azkabanexec
3)必須進入到/opt/module/azkaban/azkaban-exec 路徑,分別在三台機器上,啟動 executor server
[atguigu@hadoop102 azkaban-exec]$ bin/start-exec.sh
[atguigu@hadoop103 azkaban-exec]$ bin/start-exec.sh
[atguigu@hadoop104 azkaban-exec]$ bin/start-exec.sh
注意:如果在/opt/module/azkaban/azkaban-exec 目錄下出現 executor.port 文件,說明啟動成功
4)下面激活 executor,需要
[atguigu@hadoop102 azkaban-exec]$ curl -G "hadoop102:12321/executor?action=activate" && echo
[atguigu@hadoop103 azkaban-exec]$ curl -G "hadoop103:12321/executor?action=activate" && echo
[atguigu@hadoop104 azkaban-exec]$ curl -G "hadoop104:12321/executor?action=activate" && echo
如果三台機器都出現如下提示,則表示激活成功
{"status":"success"}
配置 Web Server
Azkaban Web Server 處理項目管理,身份驗證,計划和執行觸發。
1)編輯 azkaban.properties
[atguigu@hadoop102 azkaban]$ vim /opt/module/azkaban/azkabanweb/conf/azkaban.properties
修改如下屬性
...
default.timezone.id=Asia/Shanghai
...
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=azkaban
mysql.password=000000
mysql.numconnections=100
...
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
說明:
#StaticRemainingFlowSize:正在排隊的任務數;
#CpuStatus:CPU 占用情況
#MinimumFreeMemory:內存占用情況。測試環境,必須將 MinimumFreeMemory 刪除掉,否則它會認為集群資源不夠,不執行。
2)修改 azkaban-users.xml 文件,添加 atguigu 用戶
[atguigu@hadoop102 azkaban-web]$ vim /opt/module/azkaban/azkabanweb/conf/azkaban-users.xml
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin"
username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="atguigu" roles="admin" username="atguigu"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
3)必須進入到 hadoop102 的/opt/module/azkaban/azkaban-web 路徑,啟動 web server
[atguigu@hadoop102 azkaban-web]$ bin/start-web.sh
4)訪問 http://hadoop102:8081,並用 atguigu 用戶登陸
2.2 Work Flow 案例實操
HelloWorld 案例
1)在 windows 環境,新建 azkaban.project 文件,編輯內容如下
azkaban-flow-version: 2.0
注意:該文件作用,是采用新的 Flow-API 方式解析 flow 文件。
2)新建 basic.flow 文件,內容如下
nodes:
- name: jobA
type: command
config:
command: echo "Hello World"
(1)Name:job 名稱
(2)Type:job 類型。command 表示你要執行作業的方式為命令
(3)Config:job 配置
flow是工作流程配置文件,使用YAML語言,https://www.runoob.com/w3cnote/yaml-intro.html
YAML 是 "YAML Ain't a Markup Language"(YAML 不是一種標記語言)的遞歸縮寫。在開發的這種語言時,YAML 的意思其實是:"Yet Another Markup Language"(仍是一種標記語言)。
YAML 的語法和其他高級語言類似,並且可以簡單表達清單、散列表,標量等數據形態。它使用空白符號縮進和大量依賴外觀的特色,特別適合用來表達或編輯數據結構、各種配置文件、傾印調試內容、文件大綱(例如:許多電子郵件標題格式和YAML非常接近)。
YAML 的配置文件后綴為 .yml,如:runoob.yml 。
3)將 azkaban.project、basic.flow 文件壓縮到一個 zip 文件,文件名稱必須是英文。
4)在 WebServer 新建項目:http://hadoop102:8081/index
5)給項目名稱命名和添加項目描述
6)first.zip 文件上傳
7)選擇上傳的文件
8)執行任務流
9)在日志中,查看運行結
作業依賴案例
需求:JobA 和 JobB 執行完了,才能執行 JobC
具體步驟:
1)修改 basic.flow 為如下內容
nodes:
- name: jobA
type: command
config:
command: echo "I’m JobA"
- name: jobB
type: command
config:
command: echo "I’m JobB"
- name: jobC
type: command
# jobC 依賴 JobA 和 JobB
dependsOn:
- jobA
- jobB
config:
command: echo "I’m JobC"
dependsOn:作業依賴,后面案例中演示
2)將修改后的 basic.flow 和 azkaban.project 壓縮成 second.zip 文件
3)重復 上面 HelloWorld 后續步驟。
自動失敗重試案例
需求:如果執行任務失敗,需要重試 3 次,重試的時間間隔 10000ms
具體步驟:
1)編譯配置流
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 10000
參數說明:
retries:重試次數
retry.backoff:重試的時間間隔
2)將修改后的 basic.flow 和 azkaban.project 壓縮成 four.zip 文件
3)重復上面的后續步驟。
4)執行並觀察到一次失敗+三次重試(總共四次)
5)也可以點擊上圖中的 Log,在任務日志中看到,總共執行了 4 次。
6)也可以在 Flow 全局配置中添加任務失敗重試配置,此時重試配置會應用到所有 Job。
案例如下:
config:
retries: 3
retry.backoff: 10000
nodes:
- name: JobA
type: command
config:
command: sh /not_exists.sh
手動失敗重試案例
需求:JobA=》JobB(依賴於 A)=》JobC=》JobD=》JobE=》JobF。六個工作單元 生產環境,任何 Job 都有可能掛掉,可以根據需求執行想要執行的 Job。
具體步驟:
1)編譯配置流
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF."
2)將修改后的 basic.flow 和 azkaban.project 壓縮成 five.zip 文件
3)重復上面的后續步驟。
跳過C 繼續執行
Enable 和 Disable 下面都分別有如下參數:
- Parents:該作業的上一個任務
- Ancestors:該作業前的所有任務
- Children:該作業后的一個任務
- Descendents:該作業后的所有任務
- Enable All:所有的任務
4)可以根據需求選擇性執行對應的任務。
第三章、Azkaban 進階
3.1 JavaProcess 作業類型案例
JavaProcess 類型可以運行一個自定義主類方法,type 類型為 javaprocess,可用的配置為:
- Xms:最小堆
- Xmx:最大堆
- classpath:類路徑
- java.class:要運行的 Java 對象,其中必須包含 Main 方法
- main.args:main 方法的參數
案例:
1)新建一個 azkaban 的 maven 工程
2)創建包名:com.atguigu
3)創建 AzTest 類
package com.atguigu;
public class AzTest {
public static void main(String[] args) {
System.out.println("This is for testing!");
}
}
4)打包成 jar 包 azkaban-1.0-SNAPSHOT.jar
5)新建 testJava.flow,內容如下
nodes:
- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.atguigu.AzTest
6)將 Jar 包、flow 文件和 project 文件打包成 javatest.zip
7)創建項目=》上傳 javatest.zip =》執行作業=》觀察結果
3.2 條件工作流案例
條件工作流功能允許用戶自定義執行條件來決定是否運行某些Job。條件可以由當前Job的父 Job 輸出的運行時參數構成,也可以使用預定義宏。在這些條件下,用戶可以在確定 Job執行邏輯時獲得更大的靈活性,例如,只要父 Job 之一成功,就可以運行當前 Job。
運行時參數案例
1)基本原理
(1)父 Job 將參數寫入 JOB_OUTPUT_PROP_FILE 環境變量所指向的文件
(2)子 Job 使用 ${jobName:param}來獲取父 Job 輸出的參數並定義執行條件
2)支持的條件運算符:
(1)== 等於
(2)!= 不等於
(3)> 大於
(4)>= 大於等於
(5)< 小於
(6)<= 小於等於
(7)&& 與 (8)|| 或 (9)! 非 3)案例:
需求:
JobA 執行一個 shell 腳本。
JobB 執行一個 shell 腳本,但 JobB 不需要每天都執行,而只需要每個周一執行。
(1)新建 JobA.sh
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
date +%w 返回周幾
一個任務運行結束,可以將一些參數寫入到\({JOB_OUTPUT_PROP_FILE}文件 中,azkaban會將這些參數傳遞到下游依賴的的job的參數文件\){JOB_PROP_FILE}文件中,供下游job引用。
寫如到${JOB_OUTPUT_PROP_FILE}文件中參數需要是json格式
的,否則會報json解析錯。
JOB_OUTPUT_PROP_FILE也是一個環境變量參數。
(2)新建 JobB.sh
#!/bin/bash
echo "do JobB"
(3)新建 condition.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:wk} == 1
周一才去執行
(4)將 JobA.sh、JobB.sh、condition.flow 和 azkaban.project 打包成 condition.zip
(5)創建 condition 項目=》上傳 condition.zip 文件=》執行作業=》觀察結果
(6)按照我們設定的條件,JobB 會根據當日日期決定是否執行。
預定義宏案例
Azkaban 中預置了幾個特殊的判斷條件,稱為預定義宏。
預定義宏會根據所有父 Job 的完成情況進行判斷,再決定是否執行。可用的預定義宏如下:
(1)all_success: 表示父 Job 全部成功才執行(默認)
(2)all_done:表示父 Job 全部完成才執行
(3)all_failed:表示父 Job 全部失敗才執行
(4)one_success:表示父 Job 至少一個成功才執行
(5)one_failed:表示父 Job 至少一個失敗才執行
1)案例
需求:
JobA 執行一個 shell 腳本
JobB 執行一個 shell 腳本
JobC 執行一個 shell 腳本,要求 JobA、JobB 中有一個成功即可執行
(1)新建 JobA.sh
#!/bin/bash
echo "do JobA"
(2)新建 JobC.sh
#!/bin/bash
echo "do JobC"
(3)新建 macro.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
(4)JobA.sh、JobC.sh、macro.flow、azkaban.project 文件,打包成 macro.zip。
注意:沒有 JobB.sh。
(5)創建 macro 項目=》上傳 macro.zip 文件=》執行作業=》觀察結果
3.3 定時執行案例
需求:JobA 每間隔 1 分鍾執行一次;
具體步驟:
1)Azkaban 可以定時執行工作流。在執行工作流時候,選擇左下角 Schedule
2)右上角注意時區是上海,然后在左面填寫具體執行事件,填寫的方法和 crontab 配置定時任務規則一致。
每分鍾執行一次
3)觀察結果
4)刪除定時調度
點擊 remove Schedule 即可刪除當前任務的調度規則。
3.4 郵件報警案例
注冊郵箱
1)申請注冊一個 郵箱
這里我用QQ郵箱
2)點擊郵箱設置=》賬戶
3)開啟 SMTP 服務
4)一定要記住授權碼
要發送短信認證。
默認郵件報警案例
Azkaban 默認支持通過郵件對失敗的任務進行報警,配置方法如下:
1 ) 在 azkaban-web 節 點 hadoop102 上 , 編 輯 /opt/module/azkaban/azkabanweb/conf/azkaban.properties,修改如下內容:
[atguigu@hadoop102 azkaban-web]$ vim /opt/module/azkaban/azkabanweb/conf/azkaban.properties
添加如下內容:
#這里設置郵件發送服務器,需要 申請郵箱,切開通 stmp 服務,以下只是例子
mail.sender=atguigu@126.com
mail.host=smtp.126.com
mail.user=atguigu@126.com
mail.password=用郵箱的授權碼
2)保存並重啟 web-server。
[atguigu@hadoop102 azkaban-web]$ bin/shutdown-web.sh
[atguigu@hadoop102 azkaban-web]$ bin/start-web.sh
3)編輯 basic.flow
nodes:
- name: jobA
type: command
config:
command: echo "This is an email test."
4)將 azkaban.project 和 basic.flow 壓縮成 email.zip
5)創建工程=》上傳文件=》執行作業=》查看結果
6)觀察郵箱,發現執行成功或者失敗的郵件
3.5 電話報警案例
第三方告警平台集成
有時任務執行失敗后郵件報警接收不及時,因此可能需要其他報警方式,比如電話報警。
如有類似需求,可與第三方告警平台進行集成,例如睿象雲。
1)進入睿象雲官網注冊賬號並登錄
2)集成告警平台,使用 Email 集成
3)獲取郵箱地址,后邊需將報警信息發送至該郵箱
4)配置分派策略
5)配置通知策略
測試
執行上一個郵件通知的案例,將通知對象改為剛剛集成第三方平台時獲取的郵箱。
3.6 Azkaban 多 Executor 模式注意事項
Azkaban 多 Executor 模式是指,在集群中多個節點部署 Executor。在這種模式下,Azkaban web Server 會根據策略,選取其中一個 Executor 去執行任務。
為確保所選的 Executor 能夠准確的執行任務,我們須在以下兩種方案任選其一,推薦使用方案二。
方案一:指定特定的 Executor(hadoop102)去執行任務。
1)在 MySQL 中 azkaban 數據庫 executors 表中,查詢 hadoop102 上的 Executor 的 id。
mysql> use azkaban;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from executors;
+----+-----------+-------+--------+
| id | host | port | active |
+----+-----------+-------+--------+
| 1 | hadoop103 | 35985 | 1 |
| 2 | hadoop104 | 36363 | 1 |
| 3 | hadoop102 | 12321 | 1 |
+----+-----------+-------+--------+
3 rows in set (0.00 sec)
2)在執行工作流程時加入 useExecutor 屬性,如下
方案二:在 Executor 所在所有節點部署任務所需腳本和應用。