airflow的定時任務


 

 

代碼展示 : 

import os
from datetime import datetime

import pytz
from airflow import DAG
from airflow.models import Variable
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook

# 設置第一次觸發任務時間 及 設置任務執行的時區
local_tz = pendulum.timezone("Asia/Shanghai")
start_date = datetime.datetime(2016, 1, 1, tzinfo=local_tz)

# dag默認參數
args = {
    "owner": "Rgc",  # 任務擁有人
    "depends_on_past": False,  # 是否依賴過去執行此任務的結果,如果為True,則過去任務必須成功,才能執行此次任務
    "start_date": start_date,  # 任務開始執行時間
}

# 定義一個DAG
# 參數catchup指 是否填充執行 start_date到現在 未執行的缺少任務;如:start_date定義為2019-10-10,現在是2019-10-29,任務是每天定時執行一次,
# 如果此參數設置為True,則 會生成 10號到29號之間的19點執行此任務;如果設置為False,則不會補充執行任務;
# schedule_interval:定時執行方式,推薦使用如下字符串方式, 方便寫出定時規則的網址:https://crontab.guru/
# 也可以使用  :  schedule_interval= datetime.timedelta(minutes=5)  5分鍾執行一次
dag = DAG(
    dag_id="HttpSendDag",
    catchup=False,
    default_args=args,
    schedule_interval="0 19 * * *"
)


# 觸發任務的時候的某些參數 , ds=today
def func(ds, next_ds, run_id, **kwargs):
    pass


# 調動Python相關函數
t1 = PythonOperator(
    task_id='deviation_new',
    python_callable=func,
    provide_context=True,
    dag=dag
)

注意:當您不想安排DAG時,請使用schedule_interval=None而不是schedule_interval='None'

# 每5分鍾執行一次 
schedule_interval="*/5 * * * *"

# 每小時執行一次 
schedule_interval="0 1 * * *"

# 每天8-20點之間,每小時執行一次
schedule_interval="0 8-20 * * *"

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM