5.復雜任務編排
本章內容包括:
- 研究如何區分Airflow DAG中的任務依賴順序。
- 解釋如何使用觸發器規則在Airflow DAG 的特定點實現連接。
- 顯示如何在Airflow DAG中執行條件任務,在某些條件下可以跳過該任務。
- 給出有關觸發規則在Airflow中如何起作用以及如何影響您的任務執行的基本概念。
- 演示如何使用XCom在任務之間共享狀態。
在之前的章節中,我們已經看到了如何構建一個基本的DAG並定義任務之間的簡單依賴關系。在本章中,我們將進一步探索在Airflow中如何定義任務依賴關系,以及如何使用這些功能來實現更復雜的模式,包括條件任務,分支和聯接。在本章的最后,我們還將深入探討XComs,它允許在DAG運行中的不同任務之間傳遞數據,並討論使用這種方法的優缺點。
5.1 基本依賴
在進入更復雜的任務依賴模式(如分支任務和條件任務)之前,讓我們先花點時間研究一下我們在前幾章中遇到的不同的任務依賴模式。這包括任務的線性鏈(一個接一個地執行的任務)和扇出/扇入模式(這涉及一個任務鏈接到多個下游任務,反之亦然)。為確保我們都在同一頁面上,我們將在接下來的幾節中簡要介紹這些模式的含義。
5.1.1 線性依賴
到目前為止,我們主要集中於由單個線性任務鏈組成的DAG的示例。例如,我們從第2章(圖5.1)獲取火箭發射圖片的 DAG 由三個任務組成: 一個用於下載發射元數據,一個用於下載映像,以及一個用於在整個過程完成后通知我們的任務:
Listing 5.1
download_launches = BashOperator(...)
get_pictures = PythonOperator(...)
notify = BashOperator(...)
圖5.1。 我們從第2章中獲取火箭圖片的DAG包含三個任務,這些任務用於下載元數據,獲取圖片並發送通知。 最初如圖2.3所示。
在這種DAG中,重要的是鏈中的每個前一個任務在繼續執行下一個任務之前都已完成,因為前一個任務的結果需要作為下一個任務的輸入。如我們所見,Airflow允許我們通過使用右移碼運算符在兩個任務之間創建依賴關系來指示兩個任務之間的這種類型的關系:
Listing 5.2
# Set task dependencies one-by-one:
download_launches >> get_pictures
get_pictures >> notify
# Or in one go:
download_launches >> get_pictures >> notify
任務依存關系有效地告訴Airflow,只有在其(上游)依存關系成功完成執行后,它才可以開始執行給定任務。在上面的示例中,這意味着只有在download_launches成功運行后,get_pictures才能開始執行。 同樣,只有在get_pictures任務完成且沒有錯誤時,notify才能啟動。
顯式指定任務依賴關系的一個優點是,它清楚地定義了任務中的(隱式)順序。順序。 這使Airflow能夠僅在滿足其依賴關系時安排任務,這比(例如)使用cron逐個安排單個任務並希望在第二個任務開始時之前的任務已經完成要強得多(圖5.2) )。而且,任何錯誤都將由Airflow傳播到下游任務,從而有效地推遲了執行時間。 這意味着,在download_launches任務失敗的情況下,除非解決了download_launches的問題,否則Airflow不會在當天嘗試執行get_pictures任務。
5.1.2 扇入/扇出依賴關系
除了線性的任務鏈,Airflow的任務依賴關系還可用於在任務之間創建更復雜的依賴關系結構。
例如,讓我們回顧第1章中的雨傘用例,在該用例中,我們希望訓練一個機器學習模型,根據天氣預報預測未來幾周我們的雨傘的需求。
您可能還記得第一章,雨傘DAG的主要目的是每天從兩個不同的來源獲取天氣和銷售數據,並將這兩組數據組合成一個數據集,以訓練我們的模型。因此,DAG(圖5.2)以兩組任務開始,用於獲取和清理輸入數據,一組用於天氣數據(fetch_weather和clean_weather),另一組用於銷售數據(fetch_sales和clean_sales)。這些任務之后是一個任務(join_datasets),該任務獲取清理后的銷售和天氣數據,並將這些數據集連接到一個組合數據集,以訓練模型。 最后,此數據集用於訓練模型(train_model),然后由最終任務(deploy_model)部署模型。
圖5.2 第一章的雨傘用例DAG示意圖
考慮到 DAG 的依賴性,在 fetch weather 和 clean weather 任務之間有一個線性依賴關系,因為我們需要在清理數據之前從遠程數據源獲取數據。但是,由於天氣數據的獲取/清除與銷售數據無關,因此天氣和銷售數據任務之間沒有依賴關系。總之,這意味着我們可以將獲取和清理任務的依賴項定義為:
Listing 5.3
# Multiple linear dependencies.
fetch_weather >> clean_weather
fetch_sales >> clean_sales
在兩個獲取任務的上游,我們還可以添加一個虛擬的啟動任務來表示 DAG 的開始。在這種情況下,這個任務並不是絕對必要的,但是它有助於說明在我們的 DAG 開始時所發生的隱式“扇出”,在這個 DAG 的開始啟動了 fetch _ weather 和 fetch _ sales 任務。這種扇出依賴關系(將一個任務鏈接到多個下游任務可以定義為:
Listing 5.4
# Fan out (one-to-multiple).
start >> [fetch_weather, fetch_sales]
# Note that this is equivalent to:
# start >> fetch_weather
# start >> fetch_sales
與獲取/清除任務的並行性相反,構建組合數據集需要來自天氣和銷售部門的輸入。這樣,join_datasets任務對clean_weather和clean_sales任務都具有依賴性,並且只能在兩個上游任務都成功完成后才能運行。 這種類型的結構,其中一個任務依賴於多個上游任務通常被稱為扇入結構,因為它由多個上游任務扇入到單個下游任務中。在Airflow中,扇入的依存關系可以定義如下:
Listing 5.5
# Fan in (multiple-to-one), defined in one go.
[clean_weather, clean_sales] >> join_datasets
# This notation is equivalent to defining the
# two dependencies in two separate statements:
clean_weather >> join_datasets
clean_sales >> join_datasets
在進入join_datasets任務之后,DAG的其余部分是用於訓練模型和部署模型的線性任務鏈:
Listing 5.6
# Remaining steps are a single linear chain.
join_datasets >> train_model >> deploy_model
總的來說,這應該類似於圖5.3所示的DAG。
圖5.3。由氣流的圖形視圖所呈現的雨傘DAG。這個DAG執行許多任務,包括獲取和清理銷售數據,將這些數據組合到一個數據集,並使用數據集來訓練機器學習模型。請注意,銷售/天氣數據的處理發生在DAG的單獨分支中,因為這些任務並不直接依賴於彼此。
那么,如果我們現在開始執行此DAG,您認為會發生什么? 哪些任務將首先開始運行? 您認為哪些任務將(而不是)並行運行?
如您所料,如果我們運行DAG,Airflow將首先運行啟動任務來啟動(圖5.4)。 啟動任務完成后,它將啟動fetch_sales和fetch_weather任務,這些任務將並行運行(假設您的Airflow配置為具有多個工作程序)。完成這兩個提取任務后,將啟動相應的清理任務(clean_sales或clean_weather)。 只有兩個清理任務都完成后,Airflow才能最終開始執行join_datasets任務。 最后,DAG的其余部分將線性執行,join_datasets完成后,train_model將立即運行,train_model完成后,deploy_model將運行。
圖5.4。 雨傘DAG中任務的執行順序,數字表示任務的運行順序。Airflow通過執行啟動任務開始,然后可以並行運行銷售/天氣獲取和清潔任務(如后綴a / b所示)。注意,這意味着天氣/銷售路徑是獨立運行的,這意味着3b可以在2a之前開始執行。在完成兩個清理任務之后,DAG 的其余部分繼續線性地執行連接、訓練和部署任務。
5.2 分支
假設您剛剛在DAG中完成了銷售數據的輸入,這時您的同事帶來了一些消息,管理層決定將更換ERP系統,這意味着我們的銷售數據將在一到兩周內來自不同的來源(當然也是不同的格式)。這個變化不能導致模型訓練的任何中斷。此外,他們希望我們的流程能夠與新舊系統兼容,這樣我們就可以在未來的分析中繼續使用歷史銷售數據。
您將如何解決這個問題?
5.2.1 任務分支
一種方法是重寫我們的銷售提取任務,以檢查當前執行日期,並使用它來決定兩個單獨的代碼路徑之間的提取和處理銷售數據。 例如,我們可以將銷售清理任務重寫為以下內容:
Listing 5.7
def _clean_sales(**context):
if context['execution_date'] < ERP_CHANGE_DATE:
_clean_sales_old(**context)
else
_clean_sales_new(**context)
...
clean_sales_data = PythonOperator(
task_id="clean_sales",
python_callable=_clean_sales,
provide_context=True
)
在此示例中,clean_sales_old是對舊銷售格式進行清理的函數,而 clean_sales_new對新格式進行清理。 只要這兩個功能的結果是兼容的(在列、數據類型等方面) ,我們的 DAG 的其余部分就可以保持不變,不需要擔心兩個 ERP 系統之間的差異。
類似地,我們可以通過添加從兩個系統中提取的代碼路徑,使我們的初始提取步驟與兩個ERP系統都兼容:
Listing 5.8
def _fetch_sales(**context):
if context['execution_date'] < ERP_CHANGE_DATE:
_fetch_sales_old(**context)
else:
_fetch_sales_new(**context)
...
這些變化結合起來,將允許我們的DAG以一種相對透明的方式處理來自兩個系統的數據,因為我們的初始獲取/清理任務可確保銷售數據以相同(已處理)的格式輸出,而與相應的數據源無關。
這種方法的優勢在於,它使我們能夠在DAG中加入一些靈活性,而不必修改DAG本身的結構。但是,這種方法僅在代碼中的分支包含相似任務的情況下才有效。例如,在這里,我們在代碼中實際上有兩個分支,它們都執行獲取和清除操作,並且差異最小。但是,如果從新數據源加載數據需要一個非常不同的任務鏈怎么辦?(圖5.5)在這種情況下,最好將數據提取分為兩組獨立的任務。
圖5.5。 兩個ERP系統之間不同任務集的可能示例。如果不同的情況之間有很多共性,那么您可以使用一組任務和一些內部分支來解決問題。但是,如果兩個流程之間有很多差異(如這里顯示的兩個ERP系統),您可能最好采取不同的方法。
這種方法的另一個缺點是,在特定的DAG運行期間,很難看到Airflow正在使用哪個代碼分支。 例如,在圖5.6中,您可以猜測此特定DAG運行使用的是哪個ERP系統嗎?僅使用此視圖就很難回答這個看似簡單的問題,因為實際的分支隱藏在我們的任務中。解決此問題的一種方法是在我們的任務中包括更好的日志記錄,但是正如我們將看到的,還有其他方法可以使DAG本身中的分支更加明確。
圖5.6。在fetch_sales和clean_sales任務中的兩個ERP系統之間運行DAG的示例。由於此分支發生在這兩個任務中,因此無法從此視圖查看在此DAG運行中使用了哪個ERP系統。 這意味着我們將需要檢查我們的代碼(或者可能是我們的日志),以識別使用了哪個ERP系統。
最后,我們只能依靠一般的Airflow運算符(例如PythonOperator)來將這種類型的靈活性編碼到我們的任務中。這使我們無法利用更專業的Airflow operators 提供的功能,這些功能使我們能夠以最少的編碼工作來執行更復雜的工作。例如,如果我們的數據源之一恰好是SQL數據庫,那么如果我們僅使用MysqlOperator來執行SQL查詢,它將為我們節省很多工作,因為這允許我們將查詢的實際執行(連同身份驗證等)委托給提供的operators 。
幸運的是,檢查任務中的條件並不是執行分支的唯一方法。在下一節中,我們將展示如何將分支編織到 DAG 結構中,這比基於任務的方法提供了更多的靈活性。
5.2.2 DAG中的分支
在單個DAG中支持兩種不同的ERP系統的另一種方法是開發兩組不同的任務(每個系統一組),並允許DAG選擇是否執行任務以從舊的或新的ERP系統中獲取數據( 圖5.7)。
圖5.7。 使用DAG中的分支支持兩個ERP系統。使用DAG中的分支,我們可以通過為兩個系統實施不同的任務集來支持兩個ERP系統。 然后,我們可以通過在上游添加一個額外的任務(此處為“ Pick ERP system”)來允許Airflow在這兩個分支之間進行選擇,該任務會告訴Airflow要執行哪些下游任務集。
構建這兩組任務相對簡單:我們可以使用適當的operators 分別為每個ERP系統創建任務,並將各自的任務連接在一起:
Listing 5.9
fetch_sales_old = PythonOperator(...)
clean_sales_old = PythonOperator(...)
fetch_sales_new = PythonOperator(...)
clean_sales_new = PythonOperator(...)
fetch_sales_old >> clean_sales_old
fetch_sales_new >> clean_sales_new
現在,我們仍然需要將這些任務連接到DAG的其余部分,並確保Airflow知道應在何時執行這些任務中的哪一個。
幸運的是,Airflow 提供了內置的支持,可以使用 BranchPythonOperator 在多組下游任務之間進行選擇。該運算符(顧名思義)與PythonOperator相似,因為它將一個可調用的Python作為其主要參數之一:
Listing 5.10
def _pick_erp_system(**context):
...
pick_erp_system = BranchPythonOperator(
task_id='pick_erp_system',
provide_context=True,
python_callable=_pick_erp_system,
)
但是,與 PythonOperator 不同的是,傳遞給 BranchPythonOperator 的調用將返回下游任務的 ID 作為計算結果。 返回的ID確定分支任務完成后將執行哪些下游任務。 請注意,您還可以返回任務ID列表,在這種情況下,Airflow將執行所有引用的任務。
在這種情況下,我們可以根據DAG的執行日期使用callable返回適當的task_id,從而在兩個ERP系統之間實現選擇:
Listing 5.11
def _pick_erp_system(**context):
if context["execution_date"] < ERP_SWITCH_DATE:
return "fetch_sales_old"
else:
return "fetch_sales_new"
pick_erp_system = BranchPythonOperator(
task_id='pick_erp_system',
provide_context=True,
python_callable=_pick_erp_system,
)
pick_erp_system >> [fetch_sales_old, fetch_sales_new]
通過這種方式,Airflow 將在切換日期之前執行我們的一組“舊”ERP任務,同時在切換日期之后執行新任務。現在,剩下要做的就是將這些任務與我們DAG的其余部分聯系起來!
為了將我們的分支任務連接到DAG的開始,我們可以在之前的開始任務和pick_erp_system任務之間添加一個依賴關系:
Listing 5.12
start_task >> pick_erp_system
同樣,您可能希望連接兩個清理任務就像在cleaning 任務和join_datasets任務之間添加依賴項一樣簡單(類似於我們之前將clean_sales連接到join_datasets的情況):
Listing 5.13
[clean_sales_old, clean_sales_new] >> join_datasets
但是,如果執行此操作,運行DAG將導致join_datasets任務及其所有下游任務被Airflow跳過。 (如果您願意,可以嘗試一下!)。
這是因為,在默認情況下,Airflow要求給定任務上游的所有任務都必須先成功完成,然后才能執行任務本身。 通過將我們的兩個清理任務都連接到join_datasets任務,我們創建了一個永遠都不會發生的情況,因為只執行了其中一個清理任務! 結果,join_datasets任務將永遠無法執行,並且被Airflow跳過(圖5.8)。
圖5.8。 將分支與錯誤的觸發規則結合使用將導致跳過下游任務。 在此示例中,由於sales_new分支而跳過了fetch_sales_old任務。 這導致fetch_sales_new任務下游的所有任務也被跳過,這顯然不是我們想要的。
這種行為 執行任務的時間由Airflow中的所謂“觸發規則”控制。 可以使用trigger_rule
參數為單個任務定義觸發規則,該參數可以傳遞給任何operator。 默認情況下,觸發規則設置為“ all_success”,這意味着相應任務的所有父項都需要成功才能運行任務。當使用 BranchOperator 時,這種情況從來沒有發生過,因為它會跳過任何沒有被分支選擇的任務,這說明了為什么Airflow也會跳過join_datasets任務及其所有下游任務。
為了解決這種情況,我們可以更改join_datasets的觸發規則,以便在跳過其上游任務之一時仍可以觸發。 實現此目標的一種方法是將觸發規則更改為none_failed
,該規則指定任務的所有父項都執行完畢且沒有一個失敗后,該任務應立即運行:
Listing 5.14
join_datasets = PythonOperator(
...,
trigger_rule="none_failed",
)
這樣,join_datasets將在其所有父項完成執行而沒有任何故障后立即開始執行,從而允許join_datasets在分支之后繼續執行(圖5.9)。
圖5.9。 對join_datasets任務使用觸發規則“ none_failed”在Umbrella DAG中進行分支。 通過將join_datasets的觸發規則設置為“ none_failed”,任務(及其下游依賴項)仍在分支之后執行。
這種方法的一個缺點是我們現在在join_datasets任務中有3條邊。 這並不能真正反映我們流程的本質,我們本質上是想要獲取銷售/天氣數據(首先在兩個ERP系統之間進行選擇),然后將這兩個數據源提供給join_datasets。因此,許多人選擇在繼續DAG之前通過添加連接不同分支的虛擬任務來使分支條件更加明確(圖5.10)
圖5.10。在分支之后添加一個額外的連接任務。為了使分支結構更加清晰,您可以在分支之后添加一個額外的“ join”任務,該任務將分支的譜系聯系在一起,然后繼續進行DAG的其余部分。 這項額外的任務還具有另一個優點,即您無需為DAG中的其他任務更改任何觸發規則,因為您可以在聯接任務上設置所需的觸發規則。 (請注意,這意味着您不再需要為join_datasets任務設置觸發規則。)
要將這樣的虛擬任務添加到DAG中,我們可以使用Airflow提供的內置DummyOperator:
Listing 5.15
from airflow.operators.dummy_operator import DummyOperator
join_branch = DummyOperator(
task_id="join_erp_branch",
trigger_rule="none_failed"
)
[clean_sales_old, clean_sales_new] >> join_branch
join_branch >> join_datasets
此更改還意味着我們不再需要更改join_datasets任務的觸發規則,從而使我們的分支比原始分支更獨立。
5.3 條件任務
除了分支機構之外,Airflow還為您提供了其他機制,用於根據特定條件跳過DAG中的特定任務。 這使您可以使某些任務僅在某些數據集可用時才運行,或者僅在DAG在最近的執行日期執行時才運行。
例如,在我們的雨傘DAG中(圖5.3),我們有一個任務,該任務將部署我們訓練的每個模型。 但是,請考慮如果同事對清洗代碼進行了一些更改並希望使用回填將這些更改應用於整個數據集時會發生什么。在這種情況下,回填DAG還會導致部署我們模型的許多舊實例,我們當然不希望將其部署到生產中。
5.3.1 任務中的條件
我們可以通過將DAG更改為僅針對最新的DAG運行部署模型來避免此問題,因為這可以確保我們僅部署模型的一個版本:在最新數據集上進行訓練的版本。一種方法是使用PythonOperator實施部署,並在部署函數中顯式檢查DAG的執行日期:
Listing 5.15
def _deploy(**context):
if context["execution_date"] == ...:
deploy_model()
deploy = PythonOperator(
task_id="deploy_model",
python_callable=_deploy,
provide_context=True
)
盡管此實現應具有預期的效果,但它具有與相應的分支實現相同的缺點:它將部署邏輯與條件混淆,我們不能再使用除PythonOperator之外的任何其他內置運算符,並且無法在其中跟蹤任務結果。 氣流UI變得不太明確(圖5.11)。
圖5.11。 在deploy_model任務中有條件的情況下為Umbrella DAG運行的示例,該條件可確保僅針對最新運行執行部署。 由於條件是使用deploy_model任務在內部檢查的,因此我們無法從該視圖中看出模型是否已實際部署。
5.3.2 使任務有條件
實施條件部署的另一種方法是使部署任務本身具有條件,這意味着僅根據預定義的條件(在這種情況下DAG運行是否是最新的DAG運行)才執行實際的部署任務。 在Airflow中,您可以通過向DAG添加一個附加任務來使任務成為條件,該任務會測試該條件,並確保在條件失敗時跳過所有下游任務。
遵循這個想法,我們可以通過添加一個檢查當前執行是否是最新的DAG執行的任務並在該任務的下游添加部署任務來使部署成為條件部署:
Listing 5.16
def _latest_only(**context):
...
latest_only = PythonOperator(
task_id="latest_only",
python_callable=_latest_only,
provide_context=True,
dag=dag,
)
latest_only >> deploy_model
總的來說,這現在意味着我們的DAG看起來應該如圖5.12所示,並且train_model現在已連接到我們的新任務,而deploy_model任務位於該新任務的下游。
圖5.12。 帶條件部署的雨傘DAG的替代實現,其中條件作為任務包含在DAG中。 與我們之前的實現相比,將條件作為DAG的一部分包含在內可使該條件更加明確。
接下來,我們需要填寫_latest_only函數,以確保如果execute_date不屬於最新運行,則跳過下游任務。為此,我們需要
(a)檢查執行日期
(b)需要時從函數中引發AirflowSkipException
這是 Airflow 的方式,允許我們指示應該跳過條件和所有下游任務,從而跳過部署。
總而言之,這為我們的條件提供了以下實現:
Listing 5.17
from airflow.exceptions import AirflowSkipException
def _latest_only(**context):
# Find the boundaries for our execution window.
left_window = context['dag'].following_schedule(context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
# Check if our current time is within the window.
now = pendulum.utcnow()
if not left_window < now <= right_window:
raise AirflowSkipException("Not the most recent run!")
我們可以通過執行幾天的DAG來檢查它是否有效! 這應該顯示類似於圖5.13的內容,在該圖中我們看到在所有DAG運行中,除最新運行外,我們的部署任務已被跳過。
圖5.13。 我們的Umbrella DAG運行了3次的“ latest_only”條件的結果。 我們的傘形DAG的樹狀圖顯示,我們的部署任務僅在最近的執行窗口中運行,因為在先前的執行中跳過了部署任務。 這表明我們的情況確實按預期運行。
那么這是如何工作的呢?
本質上,發生的情況是,當我們的條件任務(latest_only)引發AirflowSkipException時,該任務已完成,並且由Airflow分配了“已跳過”狀態。 接下來,Airflow查看任何下游任務的觸發規則,以確定是否應觸發這些任務。 在這種情況下,我們只有一個下游任務(部署任務),該任務使用默認的觸發規則“ all_success”,表明該任務僅在其所有上游任務均成功時才執行。在這種情況下,這是不正確的,因為其父項(條件任務)具有“已跳過”狀態而不是“成功”狀態,因此跳過了部署。
相反,如果條件任務沒有引發AirflowSkipException,則條件任務成功完成,並被賦予“成功”狀態。 這樣,由於部署任務的所有父項都已成功完成,因此觸發了部署任務,我們進行了部署。
5.3.3 使用內置運算符
由於只有最近一次DAG運行的運行任務是一個常見用例,因此Airflow還提供了內置的LatestOnlyOperator類。該Operator有效地執行與基於PythonOperator的自定義構建實現相同的工作。 使用LatestOnlyOperator,我們還可以像這樣實現條件部署,省去了編寫自己復雜的邏輯的麻煩:
Listing 5.18
from airflow.operators.latest_only_operator import LatestOnlyOperator
latest_only = LatestOnlyOperator(
task_id='latest_only',
dag=dag,
)
train_model >> latest_only >> deploy_model
當然,對於更復雜的情況,基於PythonOperator的路由為實現自定義條件提供了更大的靈活性。
5.4 有關觸發規則的更多信息
在前面的部分中,我們看到了Airflow如何允許我們構建動態行為DAG,這允許我們將分支或條件語句直接編碼到DAG中。 這種行為在很大程度上受Airflow所謂的觸發規則支配,該規則確定了Airflow何時執行任務。由於我們在上一節中相對較快地跳過了觸發規則,因此我們將在這里更詳細地探討它們,以使您了解觸發規則代表什么以及如何使用它們。
要了解觸發規則,我們首先必須檢查Airflow如何在DAG運行中執行任務。 本質上,當Airflow執行DAG時,它將連續檢查您的每個任務以查看是否可以執行它。 一旦某個任務被視為“准備執行”,該任務就會被調度程序拾取並安排執行。 因此,一旦Airflow有可用的執行插槽,便會立即執行任務。
那么,Airflow如何確定何時可以執行任務? 這就是觸發規則出現的地方。
5.4.1 什么是觸發規則?
觸發規則本質上是Airflow應用於任務的條件,取決於它們的依賴性(= DAG中的先前任務),以確定它們是否准備好執行。 Airflow的默認觸發規則是“ all_success”,該規則指出,必須先成功完成所有任務的依賴關系,然后才能執行任務本身。
要了解這是什么意思,讓我們回到最初的Umbrella DAG實現(圖5.4),除了默認的“ all_success”規則外,它還沒有使用任何觸發規則。 如果我們要開始執行此DAG,Airflow將開始循環執行其任務以確定可以執行哪些任務,即哪些任務沒有依賴關系,尚未成功完成。
A
B
圖5.14。使用默認觸發器規則“all_success”跟蹤基本的Umbrella DAG的執行(圖5.4)。(A)氣流開始執行DAG時,首先運行唯一一個之前沒有成功完成的任務:start任務。(B)成功完成啟動任務后,其他任務將准備好執行並由Airflow接管。
5.4.2 失敗的影響
當然,這僅描繪了“快樂”流程的情況,在此情況下,我們的所有任務均成功完成。 例如,如果我們的任務之一在執行過程中遇到錯誤,該怎么辦?
我們可以通過模擬其中一項任務中的故障來輕松地對此進行測試。 例如,通過模擬fetch_sales任務中的失敗,我們可以看到Airflow將為fetch_sales分配“失敗”狀態而不是為成功執行使用的“成功”狀態來記錄失敗(圖5.15)。 這意味着下游的process_sales任務無法執行,因為它要求fetch_sales成功。 結果,clean_sales任務被分配了狀態“ upstream_failed”,這表明它由於上游故障而無法繼續進行。
圖5.15。 上游故障會阻止使用默認觸發規則“ all_success”執行下游任務,該規則要求所有上游任務都必須成功。 請注意,Airflow會繼續執行與失敗的任務無關的任務(fetch_weather和process_weather)
上游任務的結果也會影響下游任務的這種行為通常稱為“傳播”,因為在這種情況下,上游故障會“傳播”到下游任務。 除了失敗之外,默認觸發規則還可以將已跳過任務的影響傳播到下游,從而導致已跳過任務下游的所有任務也被跳過。
這種傳播是“ all_success”觸發規則定義的直接結果,該規則要求所有依賴項都必須成功完成。 這樣,如果它在依賴項中遇到跳過或失敗,則除了以類似方式失敗以外,別無選擇,從而傳播了跳過或失敗。
5.4.3 其他觸發規則
除了默認觸發規則外,Airflow還支持許多其他觸發規則。 這些規則允許響應成功,失敗或跳過的任務時出現不同類型的行為。
例如,讓我們回顧一下第5.2節中兩個ERP系統之間的分支模式。 在這種情況下,我們必須調整加入分支的任務的觸發規則(由join_datasets或join_erp_branch任務完成),以避免下游任務由於分支而被跳過。 原因是,使用默認觸發規則,通過僅選擇兩個分支之一在DAG中引入的跳過將被傳播到下游,從而導致該分支之后的所有任務也被跳過。 相反,“ none_failed”觸發規則僅檢查是否所有上游任務均已完成而沒有失敗。 這意味着它可以容忍成功和跳過的任務,同時仍然等待所有上游任務完成再繼續執行,從而使觸發規則適合於加入兩個分支。 請注意,就傳播而言,這意味着規則不會傳播跳過。 但是,它仍然會傳播故障,這意味着獲取/處理任務中的任何故障仍將停止下游任務的執行。
同樣,其他觸發規則可用於處理其他類型的情況。 例如,觸發規則“ all_done”可用於定義任務完成依賴關系后立即執行的任務,而不管其結果如何。例如,這可以用於執行清理代碼(例如,關閉計算機或清理資源) ,無論發生什么情況,您都希望運行這些代碼。 另一類觸發規則包括渴望規則,例如“ one_failed”或“ one_success”,它們不等待所有上游任務在觸發之前完成,而是僅需要一個上游任務來滿足其條件才可以觸發。 這樣,這些規則可用於表示任務的早期失敗或一旦一組任務中的一個任務成功完成就迅速做出響應。
盡管這里我們不會更深入地介紹觸發規則,但是我們希望這能使您對觸發規則在Airflow中的作用以及如何將其用於將更復雜的行為引入DAG的想法有所了解。 有關觸發規則和一些潛在用例的完整概述,請參考表5.1。
表5.1。 Airflow支持的不同觸發規則的概述。
5.5 在任務之間共享數據
除了定義任務之間的依賴關系外,Airflow還允許您使用XComs [14]在任務之間共享小數據。 XComs背后的思想是,它們本質上允許您在任務之間交換消息,從而在任務之間實現某種程度的共享狀態。
5.5.1 使用XComs共享數據
為了了解其工作原理,讓我們回顧一下我們的總體用例(圖5.3)。 想象一下,在訓練模型時(在train_model任務中),使用隨機生成的標識符將訓練后的模型注冊到模型注冊表中。 為了部署經過訓練的模型,我們需要以某種方式將此標識符傳遞給deploy_model任務,以便它知道應該部署哪個版本的模型。
解決這個問題的一種方法是使用xcom在train_model和deploy_model任務之間共享模型標識符。在本例中,train_model任務負責“推送”XCom值,這實際上發布了該值,並使其可用於其他任務。 我們可以使用xcom_push方法在任務中顯式發布XCom值,該方法在Airflow上下文中的任務實例上可用:
Listing 5.19
def _train_model(**context):
model_id = str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
train_model = PythonOperator(
task_id="train_model", python_callable=_train_model, provide_context=True,
)
對xcom_push的此調用有效地告訴Airflow將我們的model_id值注冊為相應任務(train_model)以及相應DAG和執行日期的XCom值。運行此任務后,您可以在Web界面中的Admin> XComs部分中查看此已發布的XCom值(圖5.16),其中顯示了所有已發布的XCom值的概述。
圖5.16。 已注冊的XCom值概述(在Web界面中的Admin> XComs下)。
您可以使用xcom_pull方法(與xcom_push相反)來在其他任務中檢索XCom值:
Listing 5.20
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id"
)
print(f"Deploying model {model_id}")
deploy_model = PythonOperator(
task_id="deploy_model", python_callable=_deploy_model, provide_context=True,
)
這告訴Airflow從“ train_model”任務中使用鍵“ model_id”獲取XCom值,該值與我們之前在train_model任務中推送的model_id相匹配。 請注意,xcom_pull還允許您在獲取XCom值時定義dag_id和執行日期。 默認情況下,這些參數設置為當前DAG和執行日期,因此xcom_pull僅獲取當前DAG運行發布的值[15]。
我們可以通過運行DAG來驗證這項工作是否有效,這應該為我們提供類似於deploy_model任務的以下結果:
Listing 5.21
[2020-07-29 20:23:03,581] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=chapter5_08_xcoms
AIRFLOW_CTX_TASK_ID=deploy_model
AIRFLOW_CTX_EXECUTION_DATE=2020-07-28T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-07-28T00:00:00+00:00
[2020-07-29 20:23:03,584] {logging_mixin.py:95} INFO - Deploying model f323fa68-8b47-4e21-a687-7a3d9b6e105c
[2020-07-29 20:23:03,584] {python_operator.py:114} INFO - Done. Returned value was: None
除了從任務中調用xcom_pull之外,還可以在模板中引用XCom變量:
Listing 5.22
def _deploy_model(templates_dict, **context):
model_id = templates_dict["model_id"]
print(f"Deploying model {model_id}")
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
},
provide_context=True,
)
最后,一些operators 還提供了對自動推送XCom值的支持。 例如,BashOperator有一個xcom_push選項,當設置為True時,它告訴操作員將bash命令寫入stdout的最后一行作為XCom值推送。 同樣,PythonOperator會將從Python調用返回的任何值發布為XCom值。 這意味着您還可以按如下所示編寫我們的以上示例:
Listing 5.23
def _train_model(**context):
model_id = str(uuid.uuid4())
return model_id
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model")
print(f"Deploying model {model_id}")
在后台,這可以通過在默認鍵“ return_value”下注冊XCom來實現,正如我們在Admin部分中所見(圖5.17)。
圖5.17來自PythonOperator的隱式XCom已在“ return_value”鍵下注冊。
5.5.2 何時(不)使用xcom
盡管XCom似乎對於在任務之間共享狀態很有用,但它們的使用也有一些缺點。
例如,使用XComs的一個重要缺點是它們在任務之間添加了隱藏的依賴關系,因為提取任務對推送所需值的任務具有隱式依賴關系。 與顯式任務相關性相反,此任務相關性在DAG中不可見,並且在安排任務時不會考慮。 因此,您有責任確保具有XCom依賴關系的任務以正確的順序執行,Airflow不會為您執行此操作。當在不同的DAG或執行日期之間共享XCom值時,這些隱藏的依賴關系變得更加復雜,因此,這也不是我們建議的做法。
此外,當XCom破壞了operator的原子性時,它們可能會有點反模式。 例如,我們在實踐中已經看到人們使用的一種用法是使用運算符在一個任務中獲取API令牌,然后使用XCom將令牌傳遞給下一個任務。 在這種情況下,此方法的缺點是令牌在幾個小時后過期,這意味着第二個任務的任何重新運行都將由於令牌過期而失敗。更好的方法可能是將令牌的提取與第二個任務結合起來,這樣一來,API令牌和相關工作的刷新就一次性發生了(從而使任務保持原子性)。
最后,XCom 的一個技術限制是,XCom 存儲的任何值都需要是picklable。這意味着某些 Python 類型(如 lambdas 或許多與多處理相關的類)不能存儲在 XCom 中(盡管您可能不希望這樣做)。 此外,XCom值的大小受到將XCom存儲在Airflow元存儲中的數據庫字段類型的最大大小限制:
-
SQLite-存儲為BLOB類型,限制為2GB
-
PostgreSQL-存儲為BYTEA類型,限制為1 GB
-
MySQL-存儲為BLOB類型,限制為64 KB
話雖如此,如果適當地使用XComs可以成為強大的工具。 只要確保仔細考慮它們的用法並清楚地記錄它們在任務之間引入的依賴關系,就可以避免日后出現意外情況。
5.6 摘要
在這一章里你學到了:
-
如何在Airflow DAG中定義基本線性依賴性和扇入/扇出結構。
-
如何將分支合並到DAG中,從而允許您根據特定條件選擇多個執行路徑。
-
可以將分支合並到DAG的結構中,而不是將其合並到任務中,從而在DAG的執行方式的可解釋性方面提供了很多好處。
-
如何在DAG中定義條件任務,可以根據某些定義的條件執行這些任務。 與分支類似,這些條件可以直接在DAG中編碼。
-
Airflow使用觸發規則來啟用這些行為,這些行為准確定義了Airflow何時可以執行給定任務。
-
除了默認觸發規則“ all_success”之外,Airflow還支持其他各種觸發規則,您可以使用這些觸發規則來觸發您的任務以應對不同類型的情況。
-
如何使用XCom在兩個任務之間共享狀態。