airflow常見問題的排查記錄如下:
1,airflow怎么批量unpause
大量的dag任務
普通少量任務可以通過命令airflow unpause dag_id
命令來啟動,或者在web界面點擊啟動按鈕實現,但是當任務過多的時候,一個個任務去啟動就比較麻煩。其實dag信息是存儲在數據庫中的,可以通過批量修改數據庫信息來達到批量啟動dag任務的效果。假如是用mysql作為sql_alchemy_conn
,那么只需要登錄airflow數據庫,然后更新表dag的is_paused字段為0即可啟動dag任務。
示例: update dag set is_paused = 0 where dag_id like "benchmark%";
2,airflow的scheduler進程在執行一個任務后就掛起進入假死狀態
出現這個情況的一般原因是scheduler調度器生成了任務,但是無法發布出去。而日志中又沒有什么錯誤信息。
可能原因是Borker連接依賴庫沒安裝:
如果是redis作為broker則執行pip install apache‐airflow[redis]
如果是rabbitmq作為broker則執行pip install apache-airflow[rabbitmq]
還有要排查scheduler節點是否能正常訪問rabbitmq。
3,當定義的dag
文件過多的時候,airflow的scheduler節點運行效率緩慢
airflow的scheduler默認是起兩個線程,可以通過修改配置文件airflow.cfg
改進:
[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默認是2這里改為100
max_threads = 100
4,airflow日志級別更改
$ vi airflow.cfg
[core]
#logging_level = INFO
logging_level = WARNING
NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL
如果把log的級別設置為INFO, 那么小於INFO級別的日志都不輸出, 大於等於INFO級別的日志都輸出。也就是說,日志級別越高,打印的日志越不詳細。默認日志級別為WARNING。
注意: 如果將logging_level
改為WARNING
或以上級別,則不僅僅是日志,命令行輸出明細也會同樣受到影響,也只會輸出大於等於指定級別的信息,所以如果命令行輸出信息不全且系統無錯誤日志輸出,那么說明是日志級別過高導致的。
5,AirFlow: jinja2.exceptions.TemplateNotFound
這是由於airflow使用了jinja2作為模板引擎導致的一個陷阱,當使用bash命令的時候,尾部必須加一個空格:
- Described here : see below. You need to add a space after the script name in cases where you are directly calling a bash scripts in the
bash_command
attribute ofBashOperator
- this is because the Airflow tries to apply a Jinja template to it, which will fail.
t2 = BashOperator(
task_id='sleep',
bash_command="/home/batcher/test.sh", // This fails with `Jinja template not found` error
#bash_command="/home/batcher/test.sh ", // This works (has a space after)
dag=dag)
參考鏈接:
https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
6,AirFlow: Task is not able to be run
任務執行一段時間后突然無法執行,后台worker日志顯示如下提示:
[2018-05-25 17:22:05,068] {jobs.py:2508} INFO - Task is not able to be run
查看任務對應的執行日志:
cat /home/py/airflow-home/logs/testBashOperator/print_date/2018-05-25T00:00:00/6.log
...
[2018-05-25 17:22:05,067] {models.py:1190} INFO - Dependencies not met for <TaskInstance: testBashOperator.print_date 2018-05-25 00:00:00 [success]>,
dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
根據錯誤提示,說明依賴任務狀態失敗,針對這種情況有兩種解決辦法:
-
使用airflow run運行task的時候指定忽略依賴task:
$ airflow run -A dag_id task_id execution_date
-
使用命令airflow clear dag_id進行任務清理:
$ airflow clear -u testBashOperator
7,CELERY: PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@xxxx.celery.pidbox' in vhost ''
在升級celery 4.x以后使用rabbitmq為broker運行任務拋出如下異常:
[2018-06-29 09:32:14,622: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, "PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQ
SZ-L01395.celery.pidbox' in vhost '/': received the value '10000' of type 'signedint' but current is none", (50, 10), 'Queue.declare')
Traceback (most recent call last):
File "c:\programdata\anaconda3\lib\site-packages\celery\worker\worker.py", line 205, in start
self.blueprint.start(self)
.......
File "c:\programdata\anaconda3\lib\site-packages\amqp\channel.py", line 277, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQSZ-L01395.celery.pidbox' in vhost '/'
: received the value '10000' of type 'signedint' but current is none
出現該錯誤的原因一般是因為rabbitmq的客戶端和服務端參數不一致導致的,將其參數保持一致即可。
比如這里提示是x-expires 對應的celery中的配置是control_queue_expires。因此只需要在配置文件中加上control_queue_expires = None即可。
在celery 3.x中是沒有這兩項配置的,在4.x中必須保證這兩項配置的一致性,不然就會拋出如上的異常。
我這里遇到的了兩個rabbitmq的配置與celery配置的映射關系如下表:
rabbitmq | celery4.x |
---|---|
x-expires | control_queue_expires |
x-message-ttl | control_queue_ttl |
8,CELERY: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0.Please use RPC backend or a persistent backend
celery升級到4.x之后運行拋出如下異常:
/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
alternative='Please use RPC backend or a persistent backend.')
原因解析:
在celery 4.0中 rabbitmq 配置result_backbend方式變了:
以前是跟broker一樣:
result_backend = 'amqp://guest:guest@localhost:5672//'
現在對應的是rpc配置:
result_backend = 'rpc://'
參考鏈接:
http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-event_queue_prefix
9,CELERY: ValueError('not enough values to unpack (expected 3, got 0)',)
windows上運行celery 4.x拋出以下錯誤:
[2018-07-02 10:54:17,516: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
......
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
celery 4.x暫時不支持windows平台,如果為了調試目的的話,可以通過替換celery的線程池實現以達到在windows平台上運行的目的:
pip install eventlet
celery -A <module> worker -l info -P eventlet
參考鏈接:
https://stackoverflow.com/questions/45744992/celery-raises-valueerror-not-enough-values-to-unpack
https://blog.csdn.net/qq_30242609/article/details/79047660
10,Airflow: ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'
airflow運行中拋出以下異常:
Traceback (most recent call last):
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in sync
......
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/base.py", line 307, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
[2018-07-04 10:52:14,746] {celery_executor.py:101} ERROR - Error syncing the celery executor, ignoring it:
[2018-07-04 10:52:14,746] {celery_executor.py:102} ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'
這種錯誤有兩種可能原因:
- CELERY_RESULT_BACKEND屬性沒有配置或者配置錯誤;
- celery版本太低,比如airflow 1.9.0要使用celery4.x,所以檢查celery版本,保持版本兼容;
11,airflow.exceptions.AirflowException dag_id could not be found xxxx. Either the dag did not exist or it failed to parse
查看worker日志 airflow-worker.err
airflow.exceptions.AirflowException: dag_id could not be found: bmhttp. Either the dag did not exist or it failed to parse.
[2018-07-31 17:37:34,191: ERROR/ForkPoolWorker-6] Task airflow.executors.celery_executor.execute_command[181c78d0-242c-4265-aabe-11d04887f44a] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 52, in execute_command
subprocess.check_call(command, shell=True)
File "/anaconda/anaconda3/lib/python3.6/subprocess.py", line 291, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run bmhttp get_op1 2018-07-26T06:28:00 --local -sd /home/ignite/airflow/dags/BenchMark01.py' returned non-zero exit status 1.
通過異常日志中的Command
信息得知, 調度節點在生成任務消息的時候同時也指定了要執行的腳本的路徑(通過ds參數指定),也就是說調度節點(scheduler)和工作節點(worker)相應的dag腳本文件必須置於相同的路徑下面,不然就會出現以上錯誤。
參考鏈接:
https://stackoverflow.com/questions/43235130/airflow-dag-id-could-not-be-found
12,airlfow 的 REST API調用返回 Airflow 404 = lots of circles
出現這個錯誤的原因是因為URL中未提供origin
參數,這個參數用於重定向,例如調用airflow的/run
接口,可用示例如下所示: