代碼展示 :
import os from datetime import datetime import pytz from airflow import DAG from airflow.models import Variable from airflow.operators.http_operator import SimpleHttpOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from airflow.hooks.postgres_hook import PostgresHook # 設置第一次觸發任務時間 及 設置任務執行的時區 local_tz = pendulum.timezone("Asia/Shanghai") start_date = datetime.datetime(2016, 1, 1, tzinfo=local_tz) # dag默認參數 args = { "owner": "Rgc", # 任務擁有人 "depends_on_past": False, # 是否依賴過去執行此任務的結果,如果為True,則過去任務必須成功,才能執行此次任務 "start_date": start_date, # 任務開始執行時間 } # 定義一個DAG # 參數catchup指 是否填充執行 start_date到現在 未執行的缺少任務;如:start_date定義為2019-10-10,現在是2019-10-29,任務是每天定時執行一次, # 如果此參數設置為True,則 會生成 10號到29號之間的19點執行此任務;如果設置為False,則不會補充執行任務; # schedule_interval:定時執行方式,推薦使用如下字符串方式, 方便寫出定時規則的網址:https://crontab.guru/ # 也可以使用 : schedule_interval= datetime.timedelta(minutes=5) 5分鍾執行一次 dag = DAG( dag_id="HttpSendDag", catchup=False, default_args=args, schedule_interval="0 19 * * *" ) # 觸發任務的時候的某些參數 , ds=today def func(ds, next_ds, run_id, **kwargs): pass # 調動Python相關函數 t1 = PythonOperator( task_id='deviation_new', python_callable=func, provide_context=True, dag=dag )
注意:當您不想安排DAG時,請使用schedule_interval=None
而不是schedule_interval='None'
# 每5分鍾執行一次 schedule_interval="*/5 * * * *" # 每小時執行一次 schedule_interval="0 1 * * *" # 每天8-20點之間,每小時執行一次 schedule_interval="0 8-20 * * *"