1. 核心功能
1.1 DAGs
有向無環圖
反映所涉及的task的依賴關系
注:搜索dag的時候,airflow只會關注同事包含"DAG"和"airflow"字樣的py文件
1.2 scope
airflow將加載任何可以從DAG file中import的DAG對象,但是它們必須出現在globals()中,例如下面的文件,只有tag_1會被加載,tag_2只會出現在本地scope中
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
1.3 Default Arguments
如果一個字典default_args被傳給一個DAGs,它將會將其運用到所有的它的operator中。這使得復用default_args變得非常的方便
1.4 Context Manager
dags可以被當做一個管理器,去自動的分配新的operators給dag
1.5 Operators
dags描述的是怎么去跑一個工作流,operators決定實際做什么。
一個operator描述了在一個工作流中的單個task。operators經常但不總是原子的,這意味着他們可以獨立存在而不需要去和別的operator分享資源。DAG將確保operators以正確的順序運行,在這些依賴之外,operator通常是獨立運行的,甚至他們肯能運行在不同的機器上。
這是個非常微妙的關鍵點:事實上,如果兩個operator需要去共享一些信息,就像文件名或者一些小的數據,你應該去考慮將他們合並到一個operator中,如果上述情況確實是無法避免的,airflow有operator的交叉通信(xcom)在文檔中有描述。
並且airflow提供了非常多的通用operator:
BashOperator
PythonOperator
EmailOperator
SimpleHttpOperator 等等
1.6 DAG Assignment
operator不用立馬被分配給一個dag,但是,一旦operator被分配給了一個dag,它將無法被轉移或者是取消分配。dag的分配在operator被創建之后可以被非常明確的完成,通過延期分配或者從其他operator推斷的方式
例如下面的方式:
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
1.7 Bitshift Composition
在以前,operator的關系描述是通過set_upstream()和set_downstream()方法,在1.8 之后可以通過<<和>>代替依賴方法。
1.8 Tasks
當一個operator被實例化之后,它就被稱為是一個task。實例化在調用抽象operator時定義了具體的值,同時,參數化之后的task會稱為dag的一個節點。
1.9 Task Instances
一個task實例代表一個task的特定運行,其特征在於dag、任務、和時間點的組合。
它擁有運行狀態:running、success、failed、skipped、up for retry等
1.10 Workflows
通過組合dags和operators,你會創建TaskInstances,你可以創建復雜的工作流。
2. Additional Functionality
2.1 Hooks
Hooks是連接一些平台和數據庫的接口,類似於 Hive, S3, MySQL, Postgres, HDFS, Pig。hooks盡可能的實現了通用接口,並且充當operator。還需要使用airflow.models.Connection 模型來檢索主機名和身份認證信息,hooks將身份認證信息和代碼放在管道之外,集中在元數據庫中。
2.2 pools
一些系統會因為太多的進程不堪重負,airflow的pool可以被用作限制任意的task的運行。task可以在創建時通過參數指定存在的pool名稱。
例如:
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
pool中可以使用priority_weight參數去定義他的在隊列中的權重,並且決定哪個task先行執行。
當容量被撐滿時,task將會放入計划執行,一旦有容量,可運行的task和他們的狀態將會被在前端展示,當插槽空閑,隊列中的task將會基於權重進行排序執行。
2.3 Connections
外部系統的connection信息被存儲在airflow的元數據庫中,airflow的管道可以很簡單地引用被集中管理的conn_id,無需另外進行操作。
當許多的connections被定義在同一個conn_id下,在這種情況下,當hooks使用get_connection方法時,airflow將隨機選擇一個connection,當重試時允許一些基本的負載均衡和容錯。
一些hooks有默認的conn_id,當operators使用這個hook時不需要一個明確的conn_id。例如:PostgresHook的默認conn_id是postgres_default
2.4 Queues
當我們使用CeleryExecutor時,被塞入celery隊列的task是可以被規定的,隊列是BaseOperator的屬性,所以任何task可以被分配到任何的隊列中,而默認的隊列環境是在配置文件的celery下的default_queue中配置的。
workers可以監聽一個或多個隊列中的task,當一個worker啟動的時候(使用airflow worker命令),一個以逗號分隔的對列名可以被指定(airflow worker -q spark),這個worker將只會選擇那些被連接到指定對列的task。
2.5 XComs
XComs使得tasks可以交換信息,允許更加細微的控制形式和分享狀態。XComs原則上定義成key、value和timestamp,但是也可以跟蹤一些屬性,例如創建XCom的task/DAG。
XComs可以被pushed或者pulled,當一個task發送一個xcom,這個xcom是普遍可獲得的。task可以被發送通過使用方法xcom_push(),此外,當一個task返回一個值時(不管是operators的execute()方法還是PythonOperators的python_callable方法),一個包含着返回值的xcom會自動發送。
tasks調用xcom_pull()去接受xcoms,可選擇的根據key、task_ids、dag_id進行過濾。默認的,xcom_pull()在獲得值時會根據keys自動篩選執行方法。
如果xcom_pull被傳了一個task_id,則對應task最近一次的xcom值會被返回,如果一組task_ids傳過去,會返回一組對應的xcom值
也可以直接在模板中獲取xcom,例如:SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
值得注意的是,xcom與variable非常相似,但它是專門用於任務之間的通信而不是全局設置
2.6 Variables
variables是一種傳統的方式去存儲和取回任意的內容或者是key-value形式的airflow的設置,它可以在前端界面、代碼或者cli中進行增刪改查的操作,當你定義管道代碼,就可非常方便的使用,例如:
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
你可以使用variables在一個jinjia模板中:
echo {{ var.value.<variable_name> }}
2.7 Branching
有時候你需要你的工作流進行分支,或者是根據任意上游發生的條件走某條路。這其中一種實現方法就是使用BranchPythonOperator
BranchPythonOperator和PythonOperator十分相似,除了python會期望一個python_callable去返回一個task_id,返回的task_id跳過所有其他路徑,python的返回函數的task_id必須直接引用BranchPythonOperator任務下游的任務。
注意,在BranchPythonOperator中使用depend_on_past = True下游的任務在邏輯上是不合理的,因為跳過狀態總是導致依賴於過去成功的塊任務。如果非要這樣的話可以中間建立一個虛擬任務進行過度。
2.8 SubDAGs
2.9 SLAs
記錄失敗的過錯的sla任務列表
2.10 Trigger Rules
雖然正常的工作流行為是在所有的直接上游任務成功之后觸發的,但是airflow允許更為復雜的依賴項。
所有的operators有一個trigger_rule,用來定義生成的任務被觸發的規則,trigger_rule的默認參數是all_success,以下為別的參數解釋:
all_success: (default) 所有的父級任務成功
all_failed: 所有的父級任務失敗,或者上游狀態為失敗
all_done: 所有的父級任務執行完成
one_failed: 至少一個失敗,並且不會等待所有任務執行完成
one_success: 至少一個成功,並且不會等待所有任務執行完成
dummy: 依賴只是為了展示,隨意觸發
注意,這些可以與depends_on_past結合使用,當設置為true時,如果任務的先前計划未成功,則不會觸發
2.11 Latest Run Only
標准工作流行為涉及為特定日期/時間范圍內運行的一系列任務,但是,某些工作流執行的任務與運行時間無關,但是需要按計划運行,就像標准的cron作業一樣,在這些情況下,暫停期間錯過的回填運行作業會浪費cpu周期。
2.12 Zombies & Undeads
僵屍任務的特點是沒有心跳(由工作定期發出)和數據庫中的運行狀態,當工作節點無法訪問數據庫的時候,airflow進程在外部被終止或者節點重啟的時候,他們可能會發生。僵屍查殺由調度程序的進程定期執行。
undead進程的特點是存在進程和匹配的心跳,但是airflow不知道此任務在數據庫中運行。這種不匹配通常在數據庫狀態發生改變的時候發生,最有可能是通過刪除UI中的任務實例視圖中的行,指示任務驗證其作為心跳例程的一部分的狀態,並在確定他們處於這種不死的狀態時終止自身。
2.13 Cluster Policy
你本地airflow設置文件可以定義一個策略功能,該功能可以根據其他任務或DAG屬性改變其任務屬性。
