airflow trigger a DAG run with REST API


REST API

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview

為了利於管理, 支持了REST API。

To facilitate management, Apache Airflow supports a range of REST API endpoints across its objects. This section provides an overview of the API design, methods, and supported use cases.

Most of the endpoints accept JSON as input and return JSON responses. This means that you must usually add the following headers to your request:

Content-type: application/json Accept: application/json

 

Open Authentication

https://airflow.apache.org/docs/apache-airflow/stable/security/api.html#basic-authentication

默認API是關閉的, 需要調整為 鑒權認證 模式。

Basic authentication

Basic username password authentication is currently supported for the API. This works for users created through LDAP login or within Airflow Metadata DB using password.

To enable basic authentication, set the following in the configuration:

[api]
auth_backend = airflow.api.auth.backend.basic_auth 

 

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Authentication

默認為阻止任何API

To be able to meet the requirements of many organizations, Airflow supports many authentication methods, and it is even possible to add your own method.

If you want to check which auth backend is currently set, you can use airflow config get-value api auth_backend command as in the example below.

$ airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth

The default is to deny all requests.

For details on configuring the authentication, see API Authorization.

 

Trigger a new DAG run

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/post_dag_run

Trigger a new DAG run

path Parameters
dag_id
required
string

The DAG ID.

Request Body schema: application/json
dag_run_id
string Nullable

Run ID.

The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.

If not provided, a value will be generated based on execution_date.

If the specified dag_run_id is in use, the creation request fails with an ALREADY_EXISTS error.

This together with DAG_ID are a unique key.

execution_date
string <date-time>

The execution date. This is the time when the DAG run should be started according to the DAG definition. The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error. This together with DAG_ID are a unique key.

state
string (DagState)
Enum: "success" "running" "failed"

DAG State.

conf
object

JSON object describing additional configuration parameters.

The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.

 

WHY not using python client?

https://github.com/apache/airflow-client-python

項目還在開發,有些API是壞的。

Apache Airflow Python Client

NOTE: The Apache Airflow Client is still under active development and some methods or APIs might be broken. Please raise an issue in github if you encounter any such issues.

trigger接口確實是壞的, 並且測試用例大都是空的。

https://github.com/apache/airflow-client-python/issues/21

 

Requirement on DAG

https://stackoverflow.com/questions/56480312/how-to-trigger-a-dag-to-run-immediately

必須將DAG調度模式(scheduling_interva)定義為None

If you want to trigger this dag manually then you need to set scheduling_interval=None and use airflow trigger_dag dag_id (Documentation : airflow trigger dag)

 

如下:

https://www.waitingforcode.com/apache-airflow/externally-triggered-dags-apache-airflow/read

dag = DAG(
    dag_id='hello_world_a',
    default_args={
        "owner": "airflow",
        'start_date': airflow.utils.dates.days_ago(1),
    },
 schedule_interval=None
)


def print_hello(**kwargs):
    task_params = kwargs['dag_run'].conf['task_payload']
    print('Hello world a with {}'.format(task_params))

PythonOperator(
    task_id='hello_world_printer',
    python_callable=print_hello,
    provide_context=True,
    dag=dag)

 

Then deploy this DAG file

https://stackoverflow.com/questions/49033163/airflow-publish-a-dynamically-created-dag

此問題是動態創建dag的討論, 正常的發布方法為, 將dag文件拷貝到 $AIRFLOW_HOME/dags 目錄下, 則airflow文件會自動掃描加載dag。

嘗試過,將文件放入此目錄下, DAG在數秒之內就能生成, 貌似沒有必要研究動態創建的的新的方法。

 

I want to be able to publish and trigger a DAG object from my code which is not in control of scheduler (viz. $AIRFLOW_HOME/dags folder)

My last resort would be to programmatically create a py file containing the DAG definition that I want to publish and save this file to the $AIRFLOW_HOME/dags folder. I'm sure it should be easier than that.

 

https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html

2.0做了架構上的優化,scheduler將dag文件解析后的結果(序列化結果), 保存到 數據庫中, 然后sheduler和webserver都使用數據庫中的序列化結果。達到解析成果的復用。

In order to make Airflow Webserver stateless, Airflow >=1.10.7 supports DAG Serialization and DB Persistence. From Airflow 2.0.0, the Scheduler also uses Serialized DAGs for consistency and makes scheduling decisions.

As shown in the image above, when using this feature, the DagFileProcessorProcess in the Scheduler parses the DAG files, serializes them in JSON format and saves them in the Metadata DB as SerializedDagModel model.

_images/dag_serialization.png

 

Code Sample

https://github.com/fanqingsong/machine_learning_workflow_on_airflow/blob/master/rest_api_call/trigger_dag.py

觸發一個dag run,然后輪詢其狀態,直至狀態為sucess。

import requests
import json
from pprint import pprint
from datetime import datetime
import sched, time

def get_execution_time():
    # datetime object containing current date and time
    now = datetime.utcnow()
    
    print("now =", now)

    dt_string = now.strftime("%Y-%m-%dT%H:%M:%SZ")
    print("date and time =", dt_string)    

    return dt_string

dag_id = "kmeans_with_workflow"

def trigger_dag():
    exec_time = get_execution_time()

    data = {
        # "dag_run_id": dag_run_id,
        "execution_date": exec_time,
        # "execution_date": None,
        # "state": None,
        "conf": { }
    }

    header = {"content-type": "application/json"}

    result = requests.post(
    f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns",
    data=json.dumps(data),
    headers=header,
    auth=("admin", "admin"))

    pprint(result.content.decode('utf-8'))

    result = json.loads(result.content.decode('utf-8'))

    pprint(result)

    return result


def get_dag_run(dag_run_id):
    result = requests.get(
    f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
    auth=("admin", "admin"))

    pprint(result.content.decode('utf-8'))


    result = json.loads(result.content.decode('utf-8'))

    pprint(result)

    return result

result = trigger_dag()
dag_run_id = result["dag_run_id"]

s = sched.scheduler(time.time, time.sleep)

def watch_dag_until_complete():
    result = get_dag_run(dag_run_id)
    state = result["state"]

    if state != "success":
        s.enter(1, 1, watch_dag_until_complete)
    else:
        print("dag completed!")

s.enter(1, 1, watch_dag_until_complete)
s.run()

 

Time zones

https://airflow.apache.org/docs/apache-airflow/stable/timezone.html

上面的例子中, execution_time 為 UTC時間(本初子午線時間),經過查閱資料,發現其內部設計如此,這樣可以解除時區的依賴。

Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC. Also templates used in Operators are not converted. Time zone information is exposed and it is up to the writer of DAG what do with it.

 

schedule_interval

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#running-dags

除了手動和API觸發,

DAG還支持定時觸發, 使用的語法和  Crontab一致。

DAGs will run in one of two ways:

  • When they are triggered either manually or via the API

  • On a defined schedule, which is defined as part of the DAG

DAGs do not require a schedule, but it's very common to define one. You define it via the schedule_interval argument, like this:

with DAG("my_daily_dag", schedule_interval="@daily"): ... 

The schedule_interval argument takes any value that is a valid Crontab schedule value, so you could also do:

with DAG("my_daily_dag", schedule_interval="0 * * * *"): ... 

Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a DAG Run. DAG Runs can run in parallel for the same DAG, and each has a defined execution_date, which identifies the logical date and time it is running for - not the actual time when it was started.

 

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

輸入三種類型:

#1 crontab格式

#2 datetime.timedelta 多長時間間隔執行一次

#3 presets 預設值

或者None

A DAG Run is an object representing an instantiation of the DAG in time.

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG argument, which can be passed a cron expression as a str, a datetime.timedelta object, or one of the following cron "presets".

Cron Presets

preset

meaning

cron

None

Don't schedule, use for exclusively "externally triggered" DAGs

 

@once

Schedule once and only once

 

@hourly

Run once an hour at the beginning of the hour

0 * * * *

@daily

Run once a day at midnight

0 0 * * *

@weekly

Run once a week at midnight on Sunday morning

0 0 * * 0

@monthly

Run once a month at midnight of the first day of the month

0 0 1 * *

@quarterly

Run once a quarter at midnight on the first day

0 0 1 */3 *

@yearly

Run once a year at midnight of January 1

0 0 1 1 *

Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend.

 

timedelta

https://github.com/apache/airflow/issues/14969

import datetime as dt

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

dag_params = {
    'dag_id': 'schedule_interval_bug_example_dag',
    'default_args':{
        'owner': 'Administrator',
        'depends_on_past': False,
        'retries': 0,
        'email': ['example@example.com']
    },
    'schedule_interval': dt.timedelta(days=1),
    'start_date': dt.datetime(year=2021, month=1, day=1, hour=11, minute=10),
    'catchup': False
}

with DAG(**dag_params) as dag:
    DummyOperator(task_id='start') >> DummyOperator(task_id='end')

 

 

OpenAPI?

此API是遵守 openapi 規范的。

這是什么東東?

https://spec.openapis.org/oas/v3.1.0

What is the OpenAPI Specification?

The OpenAPI Specification (OAS) defines a standard, programming language-agnostic interface description for HTTP APIs, which allows both humans and computers to discover and understand the capabilities of a service without requiring access to source code, additional documentation, or inspection of network traffic. When properly defined via OpenAPI, a consumer can understand and interact with the remote service with a minimal amount of implementation logic. Similar to what interface descriptions have done for lower-level programming, the OpenAPI Specification removes guesswork in calling a service.

 

https://oai.github.io/Documentation/start-here.html

Advantages of Using OpenAPI

Having your API formally described in a machine-readable format allows automated tools to process it, instantly opening the door to:

  • Description Validation and Linting: Check that your description file is syntactically correct and adheres to a specific version of the Specification and the rest of your team’s formatting guidelines.
  • Data Validation: Check that the data flowing through your API (in both directions) is correct, during development and once deployed.
  • Documentation Generation: Create traditional human-readable documentation based on the machine-readable description, which always stays up-to-date.
  • Code Generation: Create both server and client code in any programming language, freeing developers from having to perform data validation or write SDK glue code, for example.
  • Graphical Editors: Allow easy creation of description files using a GUI instead of typing them by hand.
  • Mock Servers: Create fake servers providing example responses which you and your customers can start testing with before you write a single line of code.
  • Security Analysis: Discover possible vulnerabilities at the API design stage instead of much, much later.

 

swagger vs openapi

https://swagger.io/blog/api-strategy/difference-between-swagger-and-openapi/

Let's start with clarifying Swagger vs OpenAPI

The easiest way to understand the difference is:

  • OpenAPI = Specification
  • Swagger = Tools for implementing the specification

The OpenAPI is the official name of the specification. The development of the specification is fostered by the OpenAPI Initiative, which involves more the 30 organizations from different areas of the tech world — including Microsoft, Google, IBM, and CapitalOne. Smartbear Software, which is the company that leads the development of the Swagger tools, is also a member of the OpenAPI Initiative, helping lead the evolution of the specification.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM