airflow當觸發具有多層subDAG的任務的時候,出現[Duplicate entry ‘xxxx’ for key dag_id]的錯誤的問題處理


當觸發一個具有多層subDAG的任務時,會發現執行觸發的task任務運行失敗,但是需要觸發的目標DAG已經在運行了,dag log 錯誤內容:

[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'pcdn_export_agg_peak.split_to_agg_9.pcdn_agg-2019-11-21 09:47:00' for key 'dag_id'")
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx [parameters: ('pcdn_export_agg_peak.split_to_agg_9.pcdn_agg', <Pendulum [2019-11-21T09:47:00+00:00]>, datetime.datetime(2019, 11, 21, 9, 47, 56, 409081, tzinfo=<Timezone [UTC]>), None, 'running', 'tri_peak_agg-daily_device_app_tx-for:2019-11-20-on:20191120013000.000000', 1, b'\x80\x04\x95&\x01\x00\x00\x00\x00\x00\x00}\x94(\x8c\x03env\x94\x8c\x03dev\x94\x8c\x08start_ts\x94J\x80=\xd5]\x8c\x06end_ts\x94J\xa4K\xd5]\x8c\tstat_ ... (275 characters truncated) ... \x8c\x06device\x94as\x8c\tlog_level\x94\x8c\x04INFO\x94\x8c\rseries_chunks\x94Kd\x8c\tsp_chunks\x94J@B\x0f\x00\x8c\nsp_schunks\x94J\xa0\x86\x01\x00u.')]
[2019-11-21 17:47:56,825] {base_task_runner.py:115} INFO - Job 2: Subtask peak_agg.daily_device_app_tx (Background on this error at: http://sqlalche.me/e/gkpj)
[2019-11-21 17:47:57,393] {logging_mixin.py:95} INFO - [2019-11-21 17:47:57,392] {local_task_job.py:105} INFO - Task exited with return code 1

經過分析,觸發bug的代碼塊在airflow/api/common/experimental/trigger_dag.pydef _trigger_dag 函數中,最后在進行dag觸發的時候。


    triggers = list()
    dags_to_trigger = list()
    dags_to_trigger.append(dag)
    while dags_to_trigger:
        dag = dags_to_trigger.pop()
        trigger = dag.create_dagrun(
            run_id=run_id,
            execution_date=execution_date,
            state=State.RUNNING,
            conf=run_conf,
            external_trigger=True,
        )
        triggers.append(trigger)
        if dag.subdags:
            dags_to_trigger.extend(dag.subdags) # 在這里產生了重復觸發的BUG
    return triggers

原因為,,dag.subdags 中包含了該DAG下所有subDAG,包含subDAG下的subDAG。因此在一個有多層嵌套的DAG中,第二層subDAG一下的subDAG,均會被重復追加到dags_to_trigger,從而在數據庫的dag_runtable中,產生兩條相同的記錄。但是因為dag_runtable在創建的時候,具有兩個UNIQUE KEY(如下),因此重復記錄寫入則會觸發sql的寫入錯誤。

| dag_run | CREATE TABLE `dag_run` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dag_id` varchar(250) DEFAULT NULL,
  `execution_date` timestamp(6) NULL DEFAULT NULL,
  `state` varchar(50) DEFAULT NULL,
  `run_id` varchar(250) DEFAULT NULL,
  `external_trigger` tinyint(1) DEFAULT NULL,
  `conf` blob,
  `end_date` timestamp(6) NULL DEFAULT NULL,
  `start_date` timestamp(6) NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `dag_id` (`dag_id`,`execution_date`),
  UNIQUE KEY `dag_id_2` (`dag_id`,`run_id`),
  KEY `dag_id_state` (`dag_id`,`state`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 |

解決方案:

修改源碼,記錄以觸發的dag,每次從dags_to_trigger中取出dag之后,先判斷該dag是否已經被觸發,只有未被觸發的dag才進行觸發。

    triggers = list()
    dags_to_trigger = list()
    dags_to_trigger.append(dag)
    is_triggered = dict()
    while dags_to_trigger:
        dag = dags_to_trigger.pop()
        if is_triggered.get(dag.dag_id):
            continue
        is_triggered[dag.dag_id] = True
        trigger = dag.create_dagrun(
            run_id=run_id,
            execution_date=execution_date,
            state=State.RUNNING,
            conf=run_conf,
            external_trigger=True,
        )
        triggers.append(trigger)
        if dag.subdags:
            dags_to_trigger.extend(dag.subdags)
    return triggers

多層subDAG嵌套任務的觸發測試。

如下是通過修改官方example example_trigger_controller_dagexample_trigger_target_dag,為了方便測試,將兩個DAG代碼合並在一個文件中。
下面的例子使用了2個DAG,分別是:

  • my_trigger_target_dag,修改自example_trigger_target_dag;在這個DAG中,實現了2層subDAG嵌套。

  • my_trigger_controller_dag,修改自example_trigger_controller_dag;在這個DAG中,可以通過for循環控制,連續調用指定次數的my_trigger_target_dag

    在連續需觸發其他DAG過程中,要注意的是:

    • 需要為每次觸發設置不同的run_id,如果沒有手動設置那么系統會自動設置,但是為了方便查看觸發任務和目標DAG的運行,最好手動標志一下run_id
    • 同一個DAG每次在觸發execute_date的時候,要設置不同的execute_date,否則會觸發 Duplicate entry ‘xxxx’ for key dag_id 的錯誤,原因和如上分析一樣。
    • execute_date 一定要是UTC格式,否則目標DAG執行時間會和你希望的時間不一致。
import pprint
from datetime import datetime, timedelta
from airflow.utils import timezone

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)

# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#    (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#   a. A python callable that decides whether or not to trigger the Target DAG
#   b. An optional params dict passed to the python callable to help in
#      evaluating whether or not to trigger the Target DAG
#   c. The id (name) of the Target DAG
#   d. The python callable can add contextual info to the DagRun created by
#      way of adding a Pickleable payload (e.g. dictionary of primitives). This
#      state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
    'start_date': airflow.utils.dates.days_ago(2),
    'owner': 'airflow',
}

TARGET_DAG = "my_trigger_target_dag"
TRIGGER_CONTROLLER_DAG = "my_trigger_controller_dag"

target_dag = DAG(
    dag_id=TARGET_DAG,
    default_args=args,
    schedule_interval=None,
)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))


def sub_run_this_func(ds, **kwargs):
    dag_run_conf = kwargs['dag_run'].conf or {}
    print("Sub dag remotely received value of {} for key=message".format(dag_run_conf.get('message')))


def sub2_run_this_func(ds, **kwargs):
    dag_run_conf = kwargs['dag_run'].conf or {}
    print("Sub2 dag remotely received value of {} for key=message".format(dag_run_conf.get('message')))


def get_sub_dag(main_dag, sub_dag_prefix, schedule_interval, default_args):
    parent_dag_name = main_dag.dag_id
    sub_dag = DAG(
        dag_id="%s.%s" % (parent_dag_name, sub_dag_prefix),
        schedule_interval=schedule_interval,
        default_args=default_args,
    )
    task1 = PythonOperator(
        task_id="sub_task1",
        provide_context=True,
        python_callable=sub_run_this_func,
        dag=sub_dag,
    )

    def create_subdag_for_action2(parent_dag, dag_name):
        sub2_dag = DAG(
            dag_id="%s.%s" % (parent_dag.dag_id, dag_name),
            default_args=default_args.copy(),
            schedule_interval=schedule_interval,
        )
        sub2_task1 = PythonOperator(
            task_id="sub2_task1",
            provide_context=True,
            python_callable=sub2_run_this_func,
            dag=sub2_dag
        )
        return sub2_dag

    task2 = SubDagOperator(
        task_id="sub_dag2",
        subdag=create_subdag_for_action2(sub_dag, "sub_dag2"),
        dag=sub_dag,
    )
    task1 >> task2
    return sub_dag


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=target_dag,
)

sub_task = SubDagOperator(
    task_id="sub_run",
    subdag=get_sub_dag(target_dag, "sub_run", None, args),
    dag=target_dag,
)

# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=target_dag,
)

run_this >> sub_task >> bash_task

"""
This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
  a. A python callable that decides whether or not to trigger the Target DAG
  b. An optional params dict passed to the python callable to help in
     evaluating whether or not to trigger the Target DAG
  c. The id (name) of the Target DAG
  d. The python callable can add contextual info to the DagRun created by
     way of adding a Pickleable payload (e.g. dictionary of primitives). This
     state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""


def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj


# Define the DAG
trigger_dag = DAG(
    dag_id=TRIGGER_CONTROLLER_DAG,
    default_args={
        "owner": "airflow",
        "start_date": airflow.utils.dates.days_ago(2),
    },
    schedule_interval=None,
)

# Define the single task in this controller example DAG
execute_date = timezone.utcnow()
for idx in range(1):
    trigger = TriggerDagRunOperator(
        task_id='test_trigger_dagrun_%d' % idx,
        trigger_dag_id=TARGET_DAG,
        python_callable=conditionally_trigger,
        params={
            'condition_param': True,
            'message': 'Hello World, exec idx is %d. -- datetime.utcnow: %s; timezone.utcnow:%s' % (
                idx, datetime.utcnow(), timezone.utcnow()
            )
        },
        dag=trigger_dag,
        execution_date=execute_date,
    )
    execute_date = execute_date + timedelta(seconds=10)
  • 代碼准備完畢之后,就可以從UI中看到已經准備好的DAG。

測試DAG在UI中已經准備完畢

  • 將左側開關打開,進入觸發的DAG,點擊觸發運行,就可以看到觸發測試的結果了。

    其中前3次運行時是未啟用多層subDAG時的觸發測試,測試是通過的。

    中間3次是啟用多層subDAG嵌套之后進行的觸發測試,測試結果未通過。

    最后一次是修復代碼中觸發部分的bug之后,再次觸發測試。測試結果通過。

DAG觸發測試


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM