前文Airflow的第一個DAG已經跑起來了我們的第一個任務. 本文就來豐富這個任務.
回顧我們的任務內容
- 我們定義了DAG的名稱為
Hello-World
, 這個叫dag_id
, - 補充說明description
- 定義了調度間隔schedule_interval, 這是一個cron表達式
- 引入了一個bash任務
- 有一個重要的參數default_args, 這是dag定義的參數
如何執行不同的任務
airflow里通過引入不同的operator來執行不同的操作. 目前,內置了一些:
https://github.com/apache/airflow/tree/master/airflow/operators
第三方也貢獻了一些:
https://github.com/apache/airflow/tree/master/airflow/contrib/operators
還可以自己編寫plugin, 制作自己的任務類型插件.
當想要使用這些插件的時候,只要引入
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from operators.rdbms_to_redis_operator import RDBMS2RedisOperator
from operators.rdbms_to_hive_operator import RDBMS2HiveOperator
from operators.hive_to_rdbms_operator import Hive2RDBMSOperator
然后填充需要的參數:
t1 = BashOperator(task_id="hello",
bash_command="echo 'Hello World, today is {{ ds }}'",
dag=dag)
可以參照https://github.com/apache/airflow/tree/master/airflow/example_dags 以及源碼來使用這些任務插件。
如何獲取任務執行日期
這個值得單獨扯一篇文章, 這里簡單帶一下. 通過jinja模板變量可以獲取任務日期.
以下幾個變量用戶基本夠用
templated_command = """
echo "current bizdate is: {{ ds }} "
echo "current bizdate in number: {{ ds_nodash }} "
echo "7days after: {{ macros.ds_add(ds, 7)}} "
echo "5 days ago: {{ macros.ds_add(ds, -5) }} "
echo "bizdate iso8601 {{ ts }} "
echo "bizdate format: {{ execution_date.strftime("%d-%m-%Y") }} "
echo "bizdate 5 days ago format: {{ (execution_date - macros.timedelta(days=5)).strftime("%Y-%m-%d") }} "
"""
t1 = BashOperator(
task_id='print_date1',
bash_command=templated_command,
# on_success_callback=compass_utils.success_callback(dingding_conn_id='dingding_bigdata', receivers="ryanmiao"),
dag=dag)
執行結果日志為:
echo "current bizdate is: 2019-09-28 "
echo "current bizdate in number: 20190928 "
echo "7days after: 2019-10-05 "
echo "5 days ago: 2019-09-23 "
echo "bizdate iso8601 2019-09-28T01:00:00+08:00 "
echo "bizdate format: 28-09-2019 "
echo "bizdate 5 days ago format: 2019-09-23 "
告警
任務自己跑, 跑的結果如何, 我們需要一個通知. 可以成功的時候告訴我, 也可以失敗的時候告訴我.
default_args = {
'owner': 'ryanmiao',
'depends_on_past': False,
'start_date': datetime(2019, 5, 1, 9),
'email': ['ryan.miao@nf-3.com'],
'email_on_failure': False,
'email_on_retry': False,
# 'on_failure_callback': compass_utils.ding_failure_callback('dingding_bigdata'),
# 'on_success_callback': compass_utils.ding_success_callback('dingding_bigdata')
}
默認自帶的email on failure郵件通知,需要在配置文件里設置email。當然,我們通常是有自己的通知服務的,還集成自己的認證之類的。所以,Airflow提供了通知回調。
on_failure_callback
一個Python函數,失敗的時候執行
on_success_callback
一個Python函數,成功的時候執行
比如,我需要添加釘釘通知。
from airflow.contrib.operators.dingding_operator import DingdingOperator
def failure_callback(context):
"""
The function that will be executed on failure.
:param context: The context of the executed task.
:type context: dict
"""
message = 'AIRFLOW TASK FAILURE TIPS:\n' \
'DAG: {}\n' \
'TASKS: {}\n' \
'Reason: {}\n' \
.format(context['task_instance'].dag_id,
context['task_instance'].task_id,
context['exception'])
return DingdingOperator(
task_id='dingding_success_callback',
dingding_conn_id='dingding_default',
message_type='text',
message=message,
at_all=True,
).execute(context)
args['on_failure_callback'] = failure_callback
后台admin-connections去配置釘釘的群組token,然后這里引用connId即可。
同樣的,我們可以使用http請求調用我們自己的通知服務啊,用來發郵件,打電話什么的,都可以自定義。后面介紹自定義插件來實現這種自定義通知功能。
DAG的任務依賴
dag的任務依賴定義很簡單:
a >> b b依賴a
a << b a依賴b
a >> b >> c 依賴可以串起來
[a,b] >> c 可以依賴多個
每個依賴語句通過換行分割, 最終會組裝一個完整的依賴。
DAG的一些參數
先看看源碼的注釋
"""
A dag (directed acyclic graph) is a collection of tasks with directional
dependencies. A dag also has a schedule, a start date and an end date
(optional). For each schedule, (say daily or hourly), the DAG needs to run
each individual tasks as their dependencies are met. Certain tasks have
the property of depending on their own past, meaning that they can't run
until their previous schedule (and upstream tasks) are completed.
DAGs essentially act as namespaces for tasks. A task_id can only be
added once to a DAG.
:param dag_id: The id of the DAG
:type dag_id: str
:param description: The description for the DAG to e.g. be shown on the webserver
:type description: str
:param schedule_interval: Defines how often that DAG runs, this
timedelta object gets added to your latest task instance's
execution_date to figure out the next schedule
:type schedule_interval: datetime.timedelta or
dateutil.relativedelta.relativedelta or str that acts as a cron
expression
:param start_date: The timestamp from which the scheduler will
attempt to backfill
:type start_date: datetime.datetime
:param end_date: A date beyond which your DAG won't run, leave to None
for open ended scheduling
:type end_date: datetime.datetime
:param template_searchpath: This list of folders (non relative)
defines where jinja will look for your templates. Order matters.
Note that jinja/airflow includes the path of your DAG file by
default
:type template_searchpath: str or list[str]
:param template_undefined: Template undefined type.
:type template_undefined: jinja2.Undefined
:param user_defined_macros: a dictionary of macros that will be exposed
in your jinja templates. For example, passing ``dict(foo='bar')``
to this argument allows you to ``{{ foo }}`` in all jinja
templates related to this DAG. Note that you can pass any
type of object here.
:type user_defined_macros: dict
:param user_defined_filters: a dictionary of filters that will be exposed
in your jinja templates. For example, passing
``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows
you to ``{{ 'world' | hello }}`` in all jinja templates related to
this DAG.
:type user_defined_filters: dict
:param default_args: A dictionary of default parameters to be used
as constructor keyword parameters when initialising operators.
Note that operators have the same hook, and precede those defined
here, meaning that if your dict contains `'depends_on_past': True`
here and `'depends_on_past': False` in the operator's call
`default_args`, the actual value will be `False`.
:type default_args: dict
:param params: a dictionary of DAG level parameters that are made
accessible in templates, namespaced under `params`. These
params can be overridden at the task level.
:type params: dict
:param concurrency: the number of task instances allowed to run
concurrently
:type concurrency: int
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
:type max_active_runs: int
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns, and only once the
# of active DagRuns == max_active_runs.
:type dagrun_timeout: datetime.timedelta
:param sla_miss_callback: specify a function to call when reporting SLA
timeouts.
:type sla_miss_callback: types.FunctionType
:param default_view: Specify DAG default view (tree, graph, duration,
gantt, landing_times)
:type default_view: str
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
:type orientation: str
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
:type catchup: bool
:param on_failure_callback: A function to be called when a DagRun of this dag fails.
A context dictionary is passed as a single parameter to this function.
:type on_failure_callback: callable
:param on_success_callback: Much like the ``on_failure_callback`` except
that it is executed when the dag succeeds.
:type on_success_callback: callable
:param access_control: Specify optional DAG-level permissions, e.g.,
"{'role1': {'can_dag_read'}, 'role2': {'can_dag_read', 'can_dag_edit'}}"
:type access_control: dict
:param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.
If the dag exists already, this flag will be ignored. If this optional parameter
is not specified, the global config setting will be used.
:type is_paused_upon_creation: bool or None
"""
emmm, 這里就不一一拆解了,我傾向於用一個了解一個。用的時候對着看。
小結
dag的組成很簡單, Python語法式的聲明比起property和yaml的配置來說,更容易組織和理解。
定義好dag參數,定義任務類型Operator, 定義任務依賴就完事了。