Airflow v2.0 分布式部署 elasticsearch日志解決方案
安裝環境:
- docker
- airflow v2.0
- elasticsearch 7+
- filebeat 7+
開發依賴:
pip install 'apache-airflow-providers-elasticsearch'
日志方案
graph LR AF[(Airflow)] -.寫入.-> LOG[[json格式日志文件]] -.讀取.-> FB[/Filebeat 解析規范化日志結構/] -.存入.-> ES[(ElasticSearch)]
上圖filebeat和logstash之間可以加入logstash處理,根據個人方案設計。
airflow配置
開啟遠程日志設置,另配置elasticsearch設置信息,使webserver可以訪問到elasticsearch,遠程日志的獲取是通過對log_id
進行搜索的,所以要保證日志輸出包含與log_id_template
配置格式匹配的log_id
字段。
[logging]
# 開啟遠程日志開關
remote_logging = True
# 設置webserver elasticsearch連接信息
[elasticsearch]
host = your_host:your_port
# log_id模板,日志搜索的id
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
# 是否將日志輸出到標准輸出,根據個人日志方案設置,由於本人是需要filebeat讀取日志文件再發送至elasticsearch,所以設為false
write_stdout = False
# 是否將日志輸出為json
json_format = True
# elasticsearch 加密配置,根據個人需求配置
[elasticsearch_configs]
use_ssl = False
verify_certs = False
filebeat配置
filebeat負責讀取worker節點任務執行產生的日志,並將其格式化規范化后發給elasticsearch進行保存。
注意事項:
- host字段須為可哈希類型或者不存在
下面配置僅包含涉及到需要修改的配置。
# 讀取日志文件
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
enabled: true
paths:
- /app/logs/**/*.log
exclude_files:
- '.py.log$'
setup.template:
# name和pattern為elasticsearch index設置需要
name: "airflow_log_template"
pattern: "airflow-log*"
# 關閉Index lifecycle management,否則修改index會無效
setup.ilm.enabled: false
output.elasticsearch:
# Array of hosts to connect to.
hosts: '${FILEBEAT_OUTPUT_ELASTICSEARCH_HOSTS:"127.0.0.1:9200"}'
# Protocol - either `http` (default) or `https`.
protocol: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PROTOCOL:"http"}'
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
username: '${FILEBEAT_OUTPUT_ELASTICSEARCH_USERNAME:""}'
password: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PASSWORD:""}'
index: "airflow-log-%{+yyyy.MM}"
# 日志處理設置
processors:
# 關閉host信息輸出
# - add_host_metadata:
# when.not.contains.tags: forwarded
# 添加對json日志的解析
- decode_json_fields:
fields: ["message"]
process_array: false
max_depth: 1
target: ""
overwrite_keys: true
add_error_key: true
# 移除host字段
- drop_fields:
fields: ["host"]
ignore_missing: true
當配置修改成功后,使用
filebeat -e
進行配置測試
參考
airflow webserver獲取日志報錯
[2021-01-27 15:12:57,006] {app.py:1891} ERROR - Exception on /get_logs_with_metadata [GET]
Traceback (most recent call last):
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
return func(*args, **kwargs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
return f(*args, **kwargs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/views.py", line 1054, in get_logs_with_metadata
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
log, metadata = self._read(task_instance, try_number_element, metadata)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 163, in _read
logs_by_host = self._group_logs_by_host(logs)
File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 132, in _group_logs_by_host
grouped_logs[key].append(log)
TypeError: unhashable type: 'AttrDict'
上面報錯對應代碼:
@staticmethod
def _group_logs_by_host(logs):
grouped_logs = defaultdict(list)
for log in logs:
key = getattr(log, 'host', 'default_host') # 此處為獲取host值作為日志鍵
grouped_logs[key].append(log)
# return items sorted by timestamp.
result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))
return result
由上面代碼可以看出,上面會獲取日志host
字段信息並將其作為字典的鍵,所以日志中host字段內容必須為可以做字典鍵的可哈希類型
,不可為列表或者字典等可變類型,刪除日志host字段或者設為哈希類型可以解決此問題。