Apache DolphinScheduler 使用文檔(6/8):任務節點類型與任務參數設置


本文章經授權轉載,原文鏈接:

https://blog.csdn.net/MiaoSO/article/details/104770720

目錄

6. 任務節點類型和參數設置

  • 6.1 Shell節點

  • 6.2 子流程節點

  • 6.3 存儲過程節點

  • 6.4 SQL節點

    • 6.4.1 Mysql

    • 6.4.2 Hive

    • 6.4.3 Other

  • 6.5 SPARK節點

  • 6.6 Flink節點

  • 6.7 MapReduce(MR)節點

    • 6.7.1 Java 程序

    • 6.7.2 Python 程序

  • 6.8 Python節點

  • 6.9 依賴(DEPENDENT)節點

  • 6.10 HTTP節點


6. 任務節點類型和參數設置

6.1 Shell 節點

運行說明:shell 節點,在 worker 執行的時候,會生成一個臨時 shell 腳本,使用租戶同名的 linux 用戶執行這個腳本。
參數說明:

  • 節點名稱:一個工作流定義中的節點名稱是唯一的

  • 運行標志:標識這個節點是否能正常調度,如果不需要執行,可以打開禁止執行開關

  • 描述信息:描述該節點的功能

  • 任務優先級:級別高的任務在執行隊列中會優先執行,相同優先級的任務按照先進先出的順序執行

  • Worker分組:指定任務運行的機器列表

  • 失敗重試次數:任務失敗重新提交的次數,支持下拉和手填

  • 失敗重試間隔:任務失敗重新提交任務的時間間隔,支持下拉和手填

  • 超時告警:當任務執行時間超過超時時長可以告警並且超時失敗

  • 腳本:用戶開發的 SHELL 程序

  • 資源:是指腳本中需要調用的資源文件列表

  • 自定義參數:是 SHELL 局部的用戶自定義參數,會替換腳本中以${變量}的內容

例:

項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
拖拽 "SHELL" 節點到畫布,新增一個 Shell 任務。
節點名稱:Test_shell_01
運行標志:正常
描述:
任務優先級:MEDIUM
Worker分組:Default
失敗重試次數:0
失敗重試間隔:1
超時告警:off
腳本:
    #!/bin/sh
    for i in {1..10};do echo $i;done
資源:
自定義參數:
-> 確認添加
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_shell
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行


6.2 子流程節點

運行說明:子流程節點,就是把外部的某個工作流定義當做一個任務節點去執行。
參數說明:

  • 節點名稱:一個工作流定義中的節點名稱是唯一的

  • 運行標志:標識這個節點是否能正常調度

  • 描述信息:描述該節點的功能

  • 超時告警:勾選超時告警、超時失敗,當任務超過"超時時長"后,會發送告警郵件並且任務執行失敗

  • 子節點:是選擇子流程的工作流定義,右上角進入該子節點可以跳轉到所選子流程的工作流定義

例:

項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
Task 1:拖拽 SHELL 節點到畫布,新增一個 Shell 任務
節點名稱:Test_subprocess_01
... ...
腳本:
    #!/bin/sh
    for i in {1..10};do echo $i;done
-> 確認添加
Task 2:拖拽 SUB_PROCESS 節點到畫布,新增一個 SUB_PROCESS 任務
節點名稱:Test_subprocess_02
... ...
子節點:Test_shell
-> 確認添加
------------------------------------------------------
串聯任務節點 Task1 和 Task2
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_subprocess
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行


6.3 存儲過程節點

運行說明:根據選擇的數據源,執行存儲過程。
參數說明:

  • 數據源:存儲過程的數據源類型支持 MySQL、POSTGRESQL、CLICKHOUSE、ORACLE、SQLSERVER 等,選擇對應的數據源

  • 方法:是存儲過程的方法名稱

  • 自定義參數:存儲過程的自定義參數類型支持 IN、OUT 兩種,數據類型支持 VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN 九種數據類型

例:

Test_procedure(略)


6.4 SQL 節點

參數說明:

  • 數據源:選擇對應的數據源

  • sql 類型:支持查詢和非查詢兩種,查詢是 select  類型的查詢,是有結果集返回的,可以指定郵件通知為 表格附件 或 表格與附件 三種模板。非查詢是沒有結果集返回的,是針對 update、delete、insert 三種類型的操作

  • 主題、收件人、抄送人:郵件相關配置

  • sql 參數:輸入參數格式為 key1=value1;key2=value2…

  • sql 語句:SQL 語句

  • UDF 函數:對於 HIVE 類型的數據源,可以引用資源中心中創建的 UDF 函數,其他類型的數據源暫不支持 UDF 函數

  • 自定義參數:SQL 任務類型自定義參數會替換 sql 語句中 ${變量}。而存儲過程是通過自定義參數給方法參數設置值,自定義參數類型和數據類型同存儲過程任務類型一樣。

  • 前置 sql:執行 “sql語句” 前的操作

  • 后置 sql:執行 “sql語句” 后的操作

6.4.1 Mysql

例:

項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
Task 1:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_mysql_01
... ...
數據源:MYSQL   test01_mysql
sql類型:查詢   表格:√ 附件:√
主題:Test MySQL
收件人:tourist@sohh.cn
sql語句:
    select * from test_table where score=${i};
自定義參數:
    i -> IN -> INTEGER -> 97
前置sql:
    INSERT INTO test_table values(null, 'Dog',97)
后置sql:
-> 確認添加
Task 2:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_mysql_02
... ...
數據源:MYSQL   test01_mysql
sql類型:非查詢
sql語句:
    create table test_table2 as select * from test_table;
自定義參數:
前置 sql:
后置 sql:
-> 確認添加
------------------------------------------------------
串聯任務節點 Test_sql_mysql_01、Test_sql_mysql_02
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_sql_mysql
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行


6.4.2 Hive

例:

項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
Task 1:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_hive_01
... ...
數據源:Hive  test_hiveserver2
sql類型:查詢   表格:√ 附件:√
主題:Test Hive
收件人:tourist@sohh.cn
sql語句(結尾不要加分號):
    select * from test_table where score=${i}
自定義參數:
    i -> IN -> INTEGER -> 97
前置sql:
    INSERT INTO test_table values(null, 'Dog',97)
后置sql:
-> 確認添加
Task 2:拖拽 SQL 節點到畫布,新增一個 SQL 任務
節點名稱:Test_sql_hive_02
... ...
數據源:Hive  test_hiveserver2_ha
sql類型:非查詢
sql語句(結尾不要加分號):
    create table test_table2 as select * from test_table
自定義參數:
前置sql:
后置sql:
-> 確認添加
------------------------------------------------------
串聯任務節點 Test_sql_hive_01、 Test_sql_hive_02
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_sql_hive
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行


6.4.3 Other

POSTGRESQL、SPARK、CLICKHOUSE、ORACLE、SQLSERVER(略)


6.5 SPARK 節點

執行說明:通過 SPARK 節點,可以直接直接執行 SPARK 程序,對於 spark 節點,worker 會使用 spark-submit 方式提交任務 參數說明:

  • 程序類型:支持 JAVA、Scala 和 Python 三種語言

  • 主函數的 class:是 Spark 程序的入口 Main Class 的全路徑

  • 主 jar 包:是 Spark 的 jar 包

  • 部署方式:支持 yarn-cluster、yarn-client、和 local 三種模式

  • Driver:設置 Driver 內核數 及 內存數

  • Executor:設置 Executor 數量、Executor 內存數、Executor 內核數

  • 命令行參數:是設置 Spark 程序的輸入參數,支持自定義參數變量的替換。

  • 其他參數:支持 --jars、--files、--archives、--conf 格式

  • 資源:如果其他參數中引用了資源文件,需要在資源中選擇指定

  • 自定義參數:是 MR 局部的用戶自定義參數,會替換腳本中以${變量}的內容

注意:JAVA 和 Scala 只是用來標識,沒有區別,如果是 Python 開發的 Spark 則沒有主函數的 class ,其他都是一樣

例:略


6.6 Flink 節點

參數說明:

  • 程序類型:支持 JAVA、Scala和 Python 三種語言

  • 主函數的 class:是 Flink 程序的入口 Main Class 的全路徑

  • 主 jar 包:是 Flink 的 jar 包

  • 部署方式:支持 cluster、local 三種模式

  • slot 數量:可以設置slot數

  • taskManage 數量:可以設置 taskManage 數

  • jobManager 內存數:可以設置 jobManager 內存數

  • taskManager 內存數:可以設置 taskManager 內存數

  • 命令行參數:是設置Spark程序的輸入參數,支持自定義參數變量的替換。

  • 其他參數:支持 --jars、--files、--archives、--conf 格式

  • 資源:如果其他參數中引用了資源文件,需要在資源中選擇指定

  • 自定義參數:是 Flink 局部的用戶自定義參數,會替換腳本中以${變量}的內容

注意:JAVA 和 Scala 只是用來標識,沒有區別,如果是 Python 開發的 Flink 則沒有主函數的class,其他都是一樣

例:略


6.7 MapReduce(MR) 節點

執行說明:使用 MR 節點,可以直接執行 MR 程序。對於 MR 節點,worker 會使用 hadoop jar 方式提交任務


6.7.1 Java 程序

參數說明:

  • 程序類型:JAVA

  • 主函數的 class:是 MR 程序的入口 Main Class 的全路徑

  • 主jar包:是 MR 的 jar 包

  • 命令行參數:是設置 MR 程序的輸入參數,支持自定義參數變量的替換

  • 其他參數:支持 –D、-files、-libjars、-archives格式

  • 資源:如果其他參數中引用了資源文件,需要在資源中選擇指定

  • 自定義參數:是MR局部的用戶自定義參數,會替換腳本中以${變量}的內容

例:

# 將 MR 的示例 jar 包上傳到 資源中心;並創建測試文本上傳到 HDFS 目錄
# CDH 版本 Jar 包位置:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
拖拽 MR 節點到畫布,新增一個 MR 任務
節點名稱:Test_mr_java_01
... ...
程序類型:JAVA
主函數的class:wordcount
主jar包:hadoop-mapreduce-examples.jar
命令行參數:/tmp/test.txt /tmp/output
其他參數:
資源:
自定義參數:
-> 確認添加
------------------------------------------------------
保存 ->
設置DAG圖名稱:Test_mr_java
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行(運行MR的權限問題此處不再描述)
------------------------------------------------------
查看結果:
sudo -u hdfs hadoop fs -cat /tmp/output/*


6.7.2 Python 程序

參數說明:

  • 程序類型:Python

  • 主jar包:運行 MR 的 Python jar包

  • 其他參數:支持 –D、-mapper、-reducer、-input -output格式,這里可以設置用戶自定義參數的輸入,比如:-mapper "mapper.py 1" -file mapper.py -reducer reducer.py -file reducer.py –input /journey/words.txt -output /journey/out/mr/${currentTimeMillis} 其中 -mapper 后的 mapper.py 1是兩個參數,第一個參數是mapper.py,第二個參數是1

  • 資源:如果其他參數中引用了資源文件,需要在資源中選擇指定

  • 自定義參數:是 MR 局部的用戶自定義參數,會替換腳本中以${變量}的內容


6.8 Python節點

運行說明:使用 python 節點,可以直接執行 python 腳本,對於 python 節點,worker會使用 python ** 方式提交任務。參數說明:腳本:用戶開發的 Python 程序 資源:是指腳本中需要調用的資源文件列表 自定義參數:是 Python 局部的用戶自定義參數,會替換腳本中以 ${變量} 的內容

例:

項目管理 -> 工作流 -> 工作流定義 -> 創建工作流
------------------------------------------------------
拖拽 Python 節點到畫布,新增一個 Python 任務
節點名稱:Test_python_01
... ...
腳本:
    #!/user/bin/python
    # -*- coding: UTF-8 -*-
    for num in range(0, 10): print 'Round %d ...' % num
資源:
自定義參數:
-> 確認添加
------------------------------------------------------
保存 ->
設置 DAG 圖名稱:Test_python
選擇租戶:Default
超時告警:off
設置全局:
------------------------------------------------------
添加 -> 上線 -> 運行


6.9 依賴(DEPENDENT)節點

運行說明:依賴節點,就是依賴檢查節點。比如 A 流程依賴昨天的 B 流程執行成功,依賴節點會去檢查 B 流程在昨天是否有執行成功的實例。

例如,A 流程為周報任務,B、C 流程為天任務,A 任務需要 B、C 任務在上周的每一天都執行成功,如圖示:

假如,周報 A 同時還需要自身在上周二執行成功:

6.10 HTTP節點

參數說明:

  • 節點名稱:一個工作流定義中的節點名稱是唯一的。

  • 運行標志:標識這個節點是否能正常調度,如果不需要執行,可以打開禁止執行開關。

  • 描述信息:描述該節點的功能。

  • 任務優先級:worker 線程數不足時,根據優先級從高到低依次執行,優先級一樣時根據先進先出原則執行。

  • Worker分組:任務分配給worker組的機器機執行,選擇Default,會隨機選擇一台worker機執行。

  • 失敗重試次數:任務失敗重新提交的次數,支持下拉和手填。

  • 失敗重試間隔:任務失敗重新提交任務的時間間隔,支持下拉和手填。

  • 超時告警:勾選超時告警、超時失敗,當任務超過"超時時長"后,會發送告警郵件並且任務執行失敗.

  • 請求地址:http 請求 URL。

  • 請求類型:支持 GET、POSt、HEAD、PUT、DELETE。

  • 請求參數:支持 Parameter、Body、Headers。

  • 校驗條件:支持默認響應碼、自定義響應碼、內容包含、內容不包含。

  • 校驗內容:當校驗條件選擇自定義響應碼、內容包含、內容不包含時,需填寫校驗內容。

  • 自定義參數:是 http 局部的用戶自定義參數,會替換腳本中以${變量}的內容。

例:略


文章目錄:
DS 1.2.0 使用文檔(1/8):架構及名詞解釋
DS 1.2.0 使用文檔(2-3/8):集群規划及環境准備
DS 1.2.0 使用文檔(4/8):軟件部署
DS 1.2.0 使用文檔(5/8):使用與測試
DS 1.2.0 使用文檔(6/8):任務節點類型與任務參數設置
DS 1.2.0 使用文檔(7/8):系統參數及自定義參數
DS 1.2.0 使用文檔(8/8):附錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM