dolphinscheduler_批量創建任務


方式一:API 調用

 第三方系統集成就需要通過調用 API 來管理項目、流程  
 12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
    01.創建 token
	02.使用 Token
	   Postman  ApiPost
	   進行API調試的Postman和編寫接口文檔的Swagger以及進行壓力測試的Jmeter等。

示例

   d8975b0e802dab417415ad24641c2751
   http://10.77.4.183:12345/dolphinscheduler
   01. JDK 的 java net包中已經提供了訪問 HTTP 協議的基本功能
   02. HttpClient  okhttp
  0. value, method;
  value:     指定請求的實際地址,指定的地址可以是URI Template 模式(后面將會說明);
  method:  指定請求的method類型, GET、POST、PUT、DELETE等;
 
 1、 params,headers;
  params: 指定request中必須包含某些參數值是,才讓該方法處理。
  headers: 指定request中必須包含某些指定的header值,才能讓該方法處理請求
        
  2、 consumes,produces;
  consumes: 指定處理請求的提交內容類型(Content-Type),例如application/json, text/html;
  produces:    指定返回的內容類型,僅當request請求頭中的(Accept)類型中包含該指定類型才返回;
   POST請求方式的編碼有3種,具體的編碼方式如下:
      A:application/x-www-form-urlencoded ==最常見的post提交數據的方式,以form表單形式提交數據
      B:application/json    ==以json格式提交數據
      C:multipart/form-data  ==一般使用來上傳文件(較少用)
	在通過requests.post()進行POST請求時,傳入報文的參數有兩個,一個是data,一個是json。
	 常見的form表單可以直接使用data參數進行報文提交,而data的對象則是python中的字典類型。  
	  01.通過給 data 參數傳遞一個 dict,我們的數據字典在發出請求時會被自動編碼為表單形式
	  02.發送編碼為 JSON 形式的數據
	   data=json.dumps(payload)  json=payload    files=files

示例代碼

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# ---------------------------

import requests
import json

# base info
headers = {'token':'d641c2751'}
host = "http://22.22.22.22:12345"

## interface
base_url = "/dolphinscheduler/projects/list"
need_url = host+base_url
r = requests.get(need_url,headers=headers, timeout=3)
print(r.text)

comb_url = host+ "/dolphinscheduler/projects"
print(comb_url)
payload = {'pageNo': 1, 'pageSize':1}
r = requests.get(comb_url,  headers=headers, params=payload,timeout=3)
print(r.text)


create_url = host+ "/dolphinscheduler/projects"
payload = {'description': "Data deal", 'projectName':"Dataflow"}
payload = json.dumps(payload)
print(payload)
r = requests.post(create_url,   headers=headers,params= payload)
print(r.url)
print(r.text)

方式二:01.WorkflowAsCode

 PythonGatewayServer, 這是一個 Workflow-as-code 的服務端,與 apiServer 等服務的啟動方式相同  
  Workflow-as-code 讓用戶可以通過 Python API 創建工作流,動態、批量地創建和更新工作流
  
##起服務 org.apache.dolphinscheduler.server.PythonGatewayServer
#安裝 Install
 $ pip install apache-dolphinscheduler
 
 # Verify installation is successful, it will show the version of apache-dolphinscheduler, here we use 0.1.0 as example
 $ pydolphinscheduler version
 0.1.0

02.Python API 動態、批量地創建和更新工作流

# 定義工作流屬性,包括名稱、調度周期、開始時間、使用租戶等信息
# from pydolphinscheduler.core.engine import ProgramType
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
# from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
# from pydolphinscheduler.tasks.datax import CustomDataX, DataX
# from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
# from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
# from pydolphinscheduler.tasks.map_reduce import 
# from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition

with ProcessDefinition(
    name="tutorial",
    schedule="0 0 0 * * ? *",
    start_time="2022-03-01",
    tenant="caic",
) as pd:
    # 定義4個任務,4個都是 shell 任務,shell 任務的必填參數為任務名、命令信息,這里都是 echo 的 shell 命令
    task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
    task_child_one = Shell(name="task_child_one", command="echo 'child one'")
    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
    task_union = Shell(name="task_union", command="echo union")

    # 定義任務間依賴關系
    # 這里將 task_child_one,task_child_two 先聲明成一個任務組,通過 python 的 list 聲明
    task_group = [task_child_one, task_child_two]
    # 使用 set_downstream 方法將任務組 task_group 聲明成 task_parent 的下游,如果想要聲明上游則使用 set_upstream
    task_parent.set_downstream(task_group)

    # 使用位操作符 << 將任務 task_union 聲明成 task_group 的下游,同時支持通過位操作符 >> 聲明
    task_union << task_group   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM