本文章經授權轉載,原文鏈接:
https://blog.csdn.net/MiaoSO/article/details/104770720
目錄
6. 任務節點類型和參數設置
6.1 Shell節點
6.2 子流程節點
6.3 存儲過程節點
6.4 SQL節點
-
6.4.1 Mysql
6.4.2 Hive
6.4.3 Other
6.5 SPARK節點
6.6 Flink節點
6.7 MapReduce(MR)節點
-
6.7.1 Java 程序
6.7.2 Python 程序
6.8 Python節點
6.9 依賴(DEPENDENT)節點
6.10 HTTP節點
6. 任務節點類型和參數設置
6.1 Shell 節點
運行說明:shell 節點,在 worker 執行的時候,會生成一個臨時 shell 腳本,使用租戶同名的 linux 用戶執行這個腳本。
參數說明:
節點名稱:一個工作流定義中的節點名稱是唯一的
運行標志:標識這個節點是否能正常調度,如果不需要執行,可以打開禁止執行開關
描述信息:描述該節點的功能
任務優先級:級別高的任務在執行隊列中會優先執行,相同優先級的任務按照先進先出的順序執行
Worker分組:指定任務運行的機器列表
失敗重試次數:任務失敗重新提交的次數,支持下拉和手填
失敗重試間隔:任務失敗重新提交任務的時間間隔,支持下拉和手填
超時告警:當任務執行時間超過超時時長可以告警並且超時失敗
腳本:用戶開發的 SHELL 程序
資源:是指腳本中需要調用的資源文件列表
自定義參數:是 SHELL 局部的用戶自定義參數,會替換腳本中以${變量}的內容
例:
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
拖拽 "SHELL" 節點到畫布,新增一個 Shell 任務。
節點名稱:Test_shell_01
運行標志:正常
描述:
任務優先級:MEDIUM
Worker分組:Default
失敗重試次數:0
失敗重試間隔:1
超時告警:off
腳本:
#!/bin/sh
for i in {1..10};do echo $i;done
資源:
自定義參數:
-> 確認添加
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_shell
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行
6.2 子流程節點
運行說明:子流程節點,就是把外部的某個工作流定義當做一個任務節點去執行。
參數說明:
節點名稱:一個工作流定義中的節點名稱是唯一的
運行標志:標識這個節點是否能正常調度
描述信息:描述該節點的功能
超時告警:勾選超時告警、超時失敗,當任務超過"超時時長"后,會發送告警郵件並且任務執行失敗
子節點:是選擇子流程的工作流定義,右上角進入該子節點可以跳轉到所選子流程的工作流定義
例:
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
Task 1:拖拽 SHELL 節點到畫布,新增一個 Shell 任務
節點名稱:Test_subprocess_01
... ...
腳本:
#!/bin/sh
for i in {1..10};do echo $i;done
-> 確認添加
Task 2:拖拽 SUB_PROCESS 節點到畫布,新增一個 SUB_PROCESS 任務
節點名稱:Test_subprocess_02
... ...
子節點:Test_shell
-> 確認添加
------------------------------------------------------
串聯任務節點 Task1 和 Task2
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_subprocess
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行
6.3 存儲過程節點
運行說明:根據選擇的數據源,執行存儲過程。
參數說明:
數據源:存儲過程的數據源類型支持 MySQL、POSTGRESQL、CLICKHOUSE、ORACLE、SQLSERVER 等,選擇對應的數據源
方法:是存儲過程的方法名稱
自定義參數:存儲過程的自定義參數類型支持 IN、OUT 兩種,數據類型支持 VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN 九種數據類型
例:
Test_procedure(略)
6.4 SQL 節點
參數說明:
數據源:選擇對應的數據源
sql 類型:支持查詢和非查詢兩種,查詢是 select 類型的查詢,是有結果集返回的,可以指定郵件通知為 表格、附件 或 表格與附件 三種模板。非查詢是沒有結果集返回的,是針對 update、delete、insert 三種類型的操作
主題、收件人、抄送人:郵件相關配置
sql 參數:輸入參數格式為 key1=value1;key2=value2…
sql 語句:SQL 語句
UDF 函數:對於 HIVE 類型的數據源,可以引用資源中心中創建的 UDF 函數,其他類型的數據源暫不支持 UDF 函數
自定義參數:SQL 任務類型自定義參數會替換 sql 語句中 ${變量}。而存儲過程是通過自定義參數給方法參數設置值,自定義參數類型和數據類型同存儲過程任務類型一樣。
前置 sql:執行 “sql語句” 前的操作
后置 sql:執行 “sql語句” 后的操作
6.4.1 Mysql
例:
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
Task 1:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_mysql_01
... ...
數據源:MYSQL test01_mysql
sql類型:查詢 表格:√ 附件:√
主題:Test MySQL
收件人:tourist@sohh.cn
sql語句:
select * from test_table where score=${i};
自定義參數:
i -> IN -> INTEGER -> 97
前置sql:
INSERT INTO test_table values(null, 'Dog',97)
后置sql:
-> 確認添加
Task 2:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_mysql_02
... ...
數據源:MYSQL test01_mysql
sql類型:非查詢
sql語句:
create table test_table2 as select * from test_table;
自定義參數:
前置 sql:
后置 sql:
-> 確認添加
------------------------------------------------------
串聯任務節點 Test_sql_mysql_01、Test_sql_mysql_02
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_sql_mysql
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行
6.4.2 Hive
例:
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
Task 1:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_hive_01
... ...
數據源:Hive test_hiveserver2
sql類型:查詢 表格:√ 附件:√
主題:Test Hive
收件人:tourist@sohh.cn
sql語句(結尾不要加分號):
select * from test_table where score=${i}
自定義參數:
i -> IN -> INTEGER -> 97
前置sql:
INSERT INTO test_table values(null, 'Dog',97)
后置sql:
-> 確認添加
Task 2:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_hive_02
... ...
數據源:Hive test_hiveserver2_ha
sql類型:非查詢
sql語句(結尾不要加分號):
create table test_table2 as select * from test_table
自定義參數:
前置sql:
后置sql:
-> 確認添加
------------------------------------------------------
串聯任務節點 Test_sql_hive_01、 Test_sql_hive_02
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_sql_hive
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行
6.4.3 Other
POSTGRESQL、SPARK、CLICKHOUSE、ORACLE、SQLSERVER(略)
6.5 SPARK 節點
執行說明:通過 SPARK 節點,可以直接直接執行 SPARK 程序,對於 spark 節點,worker 會使用 spark-submit 方式提交任務 參數說明:
程序類型:支持 JAVA、Scala 和 Python 三種語言
主函數的 class:是 Spark 程序的入口 Main Class 的全路徑
主 jar 包:是 Spark 的 jar 包
部署方式:支持 yarn-cluster、yarn-client、和 local 三種模式
Driver:設置 Driver 內核數 及 內存數
Executor:設置 Executor 數量、Executor 內存數、Executor 內核數
命令行參數:是設置 Spark 程序的輸入參數,支持自定義參數變量的替換。
其他參數:支持 --jars、--files、--archives、--conf 格式
資源:如果其他參數中引用了資源文件,需要在資源中選擇指定
自定義參數:是 MR 局部的用戶自定義參數,會替換腳本中以${變量}的內容
注意:JAVA 和 Scala 只是用來標識,沒有區別,如果是 Python 開發的 Spark 則沒有主函數的 class ,其他都是一樣
例:略
6.6 Flink 節點
參數說明:
程序類型:支持 JAVA、Scala和 Python 三種語言
主函數的 class:是 Flink 程序的入口 Main Class 的全路徑
主 jar 包:是 Flink 的 jar 包
部署方式:支持 cluster、local 三種模式
slot 數量:可以設置slot數
taskManage 數量:可以設置 taskManage 數
jobManager 內存數:可以設置 jobManager 內存數
taskManager 內存數:可以設置 taskManager 內存數
命令行參數:是設置Spark程序的輸入參數,支持自定義參數變量的替換。
其他參數:支持 --jars、--files、--archives、--conf 格式
資源:如果其他參數中引用了資源文件,需要在資源中選擇指定
自定義參數:是 Flink 局部的用戶自定義參數,會替換腳本中以${變量}的內容
注意:JAVA 和 Scala 只是用來標識,沒有區別,如果是 Python 開發的 Flink 則沒有主函數的class,其他都是一樣
例:略
6.7 MapReduce(MR) 節點
執行說明:使用 MR 節點,可以直接執行 MR 程序。對於 MR 節點,worker 會使用 hadoop jar 方式提交任務
6.7.1 Java 程序
參數說明:
程序類型:JAVA
主函數的 class:是 MR 程序的入口 Main Class 的全路徑
主jar包:是 MR 的 jar 包
命令行參數:是設置 MR 程序的輸入參數,支持自定義參數變量的替換
其他參數:支持 –D、-files、-libjars、-archives格式
資源:如果其他參數中引用了資源文件,需要在資源中選擇指定
自定義參數:是MR局部的用戶自定義參數,會替換腳本中以${變量}的內容
例:
# 將 MR 的示例 jar 包上傳到 資源中心;並創建測試文本上傳到 HDFS 目錄
# CDH 版本 Jar 包位置:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
拖拽 MR 節點到畫布,新增一個 MR 任務
節點名稱:Test_mr_java_01
... ...
程序類型:JAVA
主函數的class:wordcount
主jar包:hadoop-mapreduce-examples.jar
命令行參數:/tmp/test.txt /tmp/output
其他參數:
資源:
自定義參數:
-> 確認添加
------------------------------------------------------
保存 ->
設置DAG圖名稱:Test_mr_java
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行(運行MR的權限問題此處不再描述)
------------------------------------------------------
查看結果:
sudo -u hdfs hadoop fs -cat /tmp/output/*
6.7.2 Python 程序
參數說明:
程序類型:Python
主jar包:運行 MR 的 Python jar包
其他參數:支持 –D、-mapper、-reducer、-input -output格式,這里可以設置用戶自定義參數的輸入,比如:-mapper "mapper.py 1" -file mapper.py -reducer reducer.py -file reducer.py –input /journey/words.txt -output /journey/out/mr/${currentTimeMillis} 其中 -mapper 后的 mapper.py 1是兩個參數,第一個參數是mapper.py,第二個參數是1
資源:如果其他參數中引用了資源文件,需要在資源中選擇指定
自定義參數:是 MR 局部的用戶自定義參數,會替換腳本中以${變量}的內容
6.8 Python節點
運行說明:使用 python 節點,可以直接執行 python 腳本,對於 python 節點,worker會使用 python ** 方式提交任務。參數說明:腳本:用戶開發的 Python 程序 資源:是指腳本中需要調用的資源文件列表 自定義參數:是 Python 局部的用戶自定義參數,會替換腳本中以 ${變量} 的內容
例:
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
拖拽 Python 節點到畫布,新增一個 Python 任務
節點名稱:Test_python_01
... ...
腳本:
#!/user/bin/python
# -*- coding: UTF-8 -*-
for num in range(0, 10): print 'Round %d ...' % num
資源:
自定義參數:
-> 確認添加
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_python
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行
6.9 依賴(DEPENDENT)節點
運行說明:依賴節點,就是依賴檢查節點。比如 A 流程依賴昨天的 B 流程執行成功,依賴節點會去檢查 B 流程在昨天是否有執行成功的實例。
例如,A 流程為周報任務,B、C 流程為天任務,A 任務需要 B、C 任務在上周的每一天都執行成功,如圖示:
假如,周報 A 同時還需要自身在上周二執行成功:
6.10 HTTP節點
參數說明:
節點名稱:一個工作流定義中的節點名稱是唯一的。
運行標志:標識這個節點是否能正常調度,如果不需要執行,可以打開禁止執行開關。
描述信息:描述該節點的功能。
任務優先級:worker 線程數不足時,根據優先級從高到低依次執行,優先級一樣時根據先進先出原則執行。
Worker分組:任務分配給worker組的機器機執行,選擇Default,會隨機選擇一台worker機執行。
失敗重試次數:任務失敗重新提交的次數,支持下拉和手填。
失敗重試間隔:任務失敗重新提交任務的時間間隔,支持下拉和手填。
超時告警:勾選超時告警、超時失敗,當任務超過"超時時長"后,會發送告警郵件並且任務執行失敗.
請求地址:http 請求 URL。
請求類型:支持 GET、POSt、HEAD、PUT、DELETE。
請求參數:支持 Parameter、Body、Headers。
校驗條件:支持默認響應碼、自定義響應碼、內容包含、內容不包含。
校驗內容:當校驗條件選擇自定義響應碼、內容包含、內容不包含時,需填寫校驗內容。
自定義參數:是 http 局部的用戶自定義參數,會替換腳本中以${變量}的內容。
例:略
文章目錄:
DS 1.2.0 使用文檔(1/8):架構及名詞解釋
DS 1.2.0 使用文檔(2-3/8):集群規划及環境准備
DS 1.2.0 使用文檔(4/8):軟件部署
DS 1.2.0 使用文檔(5/8):使用與測試
DS 1.2.0 使用文檔(6/8):任務節點類型與任務參數設置
DS 1.2.0 使用文檔(7/8):系統參數及自定義參數
DS 1.2.0 使用文檔(8/8):附錄