Airflow 使用隨筆(內含 TimeZone 和 Backfill 等的詳解)


其實怎么部署  airflow 又哪些特性,然后功能又是如何全面都可以在 Reference 的文章里面找到,都不是重點這里就不贅述了。

這里重點談一下我在部署完成仔細閱讀文檔之后覺得可以總結的一些東西,或者踩到的一些坑。

首選明確 airflow 中最重要的幾個概念:

  • DAG

    DAG 意為有向無循環圖,在 Airflow 中則定義了整個完整的作業。同一個 DAG 中的所有 Task 擁有相同的調度時間。

  • Task

    Task 為 DAG 中具體的作業任務,它必須存在於某一個 DAG 之中。Task 在 DAG 中配置依賴關系,跨 DAG 的依賴是可行的,但是並不推薦。跨 DAG 依賴會導致 DAG 圖的直觀性降低,並給依賴管理帶來麻煩。

  • DAG Run

    當一個 DAG 滿足它的調度時間,或者被外部觸發時,就會產生一個 DAG Run。可以理解為由 DAG 實例化的實例。

  • Task Instance

    當一個 Task 被調度啟動時,就會產生一個 Task Instance。可以理解為由 Task 實例化的實例。

在使用過程中,需要謹記這些概念不然很容易就被繞暈。

 

TimeZone:

時區問題可以說是 airflow 中根本繞不開的一個問題,官方文檔也拿整整一章來闡述時區給 airflow 帶來的影響。

官方默認使用 UTC+0 時間來作為 airflow 的時間,並且在 airflow 提供的 webserver 上,這個時區直到現在 1.10.1 版本上依舊是無法被配置的。所以我們在 server 上看到的時間默認都是 UTC+0 的時間。但是系統啟動的時間,以及寫入數據庫元數據的時間確是可以配置時區的。他的格式遵循 IANA 格式。可以通過配置 airflow 的配置文件 airflow.cfg 改變這個時區。例如我在啟動的時候就配置了中國時間:

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
#default_timezone = utc
default_timezone = Asia/Chongqing

當配置了這個生效之后,我們給任務配置的啟動時間,以及 runtime 之類的時間會遵循這個時間(雖然在 ui 界面上看還是 UTC+0 的非常分裂。。。)

例如我們來看一個執行任務:

 可以看到 Last Run 字段指明了該字段是 2018-12-09 16:00 被執行了。按照我們剛才的配置 UTC+8 應該是 10號的 0點被執行了。我們來看下數據庫里是否真的是這樣呢?

 

可以通過下圖發現完全按照我們預期:

mysql> select * from task_instance where task_id = 'sensors'\G;
*************************** 1. row ***************************
        task_id: sensors
         dag_id: sensors_dag
 execution_date: 2018-12-10 00:00:00.000000
     start_date: 2018-12-11 00:00:04.547631
       end_date: 2018-12-11 00:04:50.686937
       duration: 286.139
          state: success
     try_number: 1
       hostname: zed-2
       unixname: apache-airflow
         job_id: 26975
           pool: NULL
          queue: default
priority_weight: 1
       operator: BashOperator
    queued_dttm: 2018-12-11 00:00:03.302928
            pid: 19962
      max_tries: 3
executor_config: }q .

 

所以說,證明了只要我們設置了剛才的參數之后,只有 ui 界面依然繼續展示 UTC+0 的時間,其實我們真正的執行操作的時區已經被修改過來了。

另外還需要注意的一個地方在於任務的申明那里,在申明一個 dag 的時候同樣需要指明自己的時區,否則一臉懵逼為啥到了時間為什么沒有正常調度起來。

import pendulum
import datetime

local_tz = pendulum.timezone("Asia/Chongqing")


# start 接收一個 %Y-%m-%d %H:%M:%S 的字符串時間
def init_default_args(owner, start_time, retry=3, email=None):
    if not email:
        email = []
    if not isinstance(start_time, datetime.datetime):
        raise TypeError('start_date must be datetime.')

    d = start_time.replace(tzinfo=local_tz)

    return {
        'owner': owner,
        'depends_on_past': False,
        'start_date': d,
        'email': email,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': retry,
        'retry_delay': datetime.timedelta(minutes=5),
    }

這里我在申明默認 default_args 參數的時候就顯示的指明了時區並且賦值給 start_date 讓他開始的時間復合我們的預期。

 

Backfill:

感覺這個也非常讓人懵逼值得拿出來多多少少談一下。

光從字面意思來理解作為一個中國人我真的是一下沒有 get 到他的點,但是后來去查了一下文檔結合他的行為看了一下可以把它簡單的理解成當你錯過了某一次執行時間之后,往回去補充執行的行為。我們可以使用手動方法來執行這個行為

airflow backfill sensors -s 2015-06-01 -e 2015-06-07

他會回補這個時間段開始的 和 -e 后面時間段結束期間所有的任務執行。回補的意思就是把沒有執行的操作都執行一遍。

這個特性想法很好,但是自自動觸發的時候不注意就會產生非常不可預期的問題。

比如剛才在上面我們談到的在給 dag 配置的時候指定的 default_args 上面有一個參數 start_date。如果我們不給 dag 指定不回補,那么 airflow 會默認回補從系統當前時間到我們指定的 start_date 期間的任務。如果這個參數設置得不恰當會打來恐怖的回補,所以一般我都會禁用回補。

sensors_dag = DAG(
    'sensors_dag',
    default_args=default_args,
    schedule_interval=u'0 0 * * *',
    catchup=False)

指定 catchup=False 。讓他從最新的任務時間點開始執行。這個在官方文檔 Scheduling & Triggers 一章有詳細提到。

 

Operator:

現在這一章我只有一個地方覺得有點坑。可能是我還沒有深度使用吧

由於 airflow 支持模版 jinja 的模版功能,所以說在使用 BashOperator 的時候要注意自己的寫法,官方文檔對此有描述

Troubleshooting
Jinja template not found
Add a space after the script name when directly calling a Bash script with the bash_command argument. This is because Airflow tries to apply a Jinja template to it, which will fail.

t2 = BashOperator(
    task_id='bash_example',

    # This fails with `Jinja template not found` error
    # bash_command="/home/batcher/test.sh",

    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
    dag=dag)

我覺得這個。。。還蠻坑的- -

這一章其實應該有蠻多的內容可以延伸,包括社區也給 airflow 貢獻了非常多的 Operator。等我深度使用之后再來 backfill 吧哈哈哈哈。

 

--------------------------------------------分割線--------------------------------------------

最近遇到一個這樣的問題,我們有一次 scheduler 死了,然后一下啟起來發現全部任務被一塊執行了。甚至每個 dag 多天沒有跑完的任務直接起來,把我們的服務器直接按死了。

后面我們需要讓該類情況得到控制,至少讓一個 dag 同一時刻只有一天被調度起來,而不是單一 dag 多天同時被拉起來,這樣直接把服務器打掛的概率非常高。

需要將全局文件 airflow.cfg 的參數 max_active_runs_per_dag 從默認的 16 改為 1

 

 

 

Reference:

https://airflow.apache.org  airflow 官方文檔

https://zhuanlan.zhihu.com/p/44768244  如何部署一個健壯的 apache-airflow 調度系統

http://www.shuhegroup.com/newsmedias/%E6%B5%85%E8%B0%88%E8%B0%83%E5%BA%A6%E5%B7%A5%E5%85%B7airflow/  淺談調度工具——Airflow

https://stackoverflow.com/questions/49231340/how-to-limit-airflow-to-run-only-1-dag-run-at-a-time  How to limit Airflow to run only 1 DAG run at a time?

https://airflow.apache.org/faq.html  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM