如何理解airflow的schedule_interval


airflow的schdule_interval剛接觸的時候還是有點燒腦的,為什么我希望它開始的時候,它就是不開始。


先來看一下官方的解釋:airflow scheduler

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

來仔細品品這段話。

如果你設置schedule_interval=@daily,在2016-01-01標記的(也就是滿足定時觸發條件的時間)任務,在2016-01-01T23:59以后會立即觸發。換句話說,任務實例在自身覆蓋的周期結束時立刻觸發。
再重復強調一下,調度器運行一個任務的時間是:從開始時間算起,一個調度周期后開始執行,也就是在一個調度周期結束的時候。

結合兩句話,也就是滿足某個調度運行的時間在這個時間周期結束的時候才開始運行。1號的任務,在2號0點開始運行,0點的任務,在1點0分的時候運行。


寫個例子來驗證一下

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

start_time=datetime(2020,6,6,11,40,0)

default_args = {
    'owner': 'defy',
    'depends_on_past': False,
    'email': ['a@b.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'start_date': start_time,
}

dag = DAG(
    'schedule2',
    default_args=default_args,
    schedule_interval="* * * * *"
)

task = BashOperator(
    task_id='schedule2-task',
    bash_command='echo {}'.format('{{ execution_date }}'),
    dag=dag)

task

為了快速驗證,以分鍾為間隔。
x
從圖上可以看到,當前時間是11:50,運行的是11:49的任務。
這樣好像比較清楚了。


這時候,突然臨座小哥問到:如果我想每天早上6點運行一個任務處理昨天一整天的數據,這個DAG在今天下午5點創建,明天早上6點會運行嗎?我就是想讓這個任務在明天早上6點開始運行,怎么設置?我:
emm
先來分析一下。
每天6點運行,cron表達式:"0 6 * * *",那任務周期就是24小時。今天下午5點創建DAG,到明天6點還不到24小時的一個周期,應該還不會開始運行,要到后天早上6點才會。


測試代碼如下:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

start_time=datetime(2020,6,1,17,0,0)
day='{{  (execution_date - macros.timedelta(hours=24)).strftime("%Y%m%d") }}'

default_args = {
    'owner': 'defy',
    'depends_on_past': False,
    'email': ['a@b.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'start_date': start_time,
}

dag = DAG(
    'schedule2',
    default_args=default_args,
    schedule_interval="0 6 * * *"
)

task = BashOperator(
    task_id='schedule2-task',
    bash_command='echo {}'.format(day),
    dag=dag)

task

其實這里並沒有說明今天下午5點創建DAG,明天6點會不會運行的問題,因為這個時間要等太久了。按照理論來說,要到后天早上6點才會開始運行。
那明天早上6點必須運行怎么辦呢?可以使用手動觸發任務,傳入時間參數來完成,這樣整個任務就可以非常靈活的執行了。具體方法可以參見另一篇文章:airflow的安裝和使用 - 完全版


其實要弄清楚任務在什么時候觸發,最清晰明了的辦法是在界面上查看所有任務的觸發時間列表:
[Tree View] -> [點擊任務小方塊] -> [Task Instances]
如圖:
x
最關鍵的信息就是[Execution Date]這一列,看這個就可以更加清楚知道schedule_interval的運行機制


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM