celery 日志設置
3種自定義Celery日志記錄處理程序的策略
python日志處理程序可以自定義日志消息,例如,我們想把日志消息寫入屏幕,文件和日志管理服務等,在這種情況下,我們能將三個日志處理程序添加到應用程序的根記錄器中。
import logging
logger = logging.getLogger()
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s [%(lineno)d]')
# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)
# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)
在celery 中添加自定義日志處理程序(通常在 celery 中配置日志記錄)可能涉及很多麻煩。celery 文檔有點少,而且在各種論壇(比如 stackoverflow中也是充滿了各種矛盾的答案),各種各樣的文章都提出了相當復雜的解決辦法和補丁。
在本文,我將展示三種配置 celery 記錄器的替代策略,並說明每種策略的工作方式和原因。提供的代碼也是可以運行的獨立腳本。要求是 python3.6+和 celery4.2.0+結合使用。
首先啟動celery
celery worker --app=app.app --concurrency=1 --loglevel=INFO
異步方式啟動任務
python app.py
celery logging
celery logging 比較復雜且不易設置。底層的 python 日志記錄系統需要支持 celery 支持的所有並發的設置:eventlet,greenlet,threads 等。但現實的情況是現在的 python 日志記錄系統並不支持所有這些不同的配置。
celery 在 celery.app.log 中提供了特殊的get_task_logger 功能。這將返回一個繼承自記錄器celery的特殊記錄器 celery.task,該記錄器自動獲取任務名稱以及唯一 ID 作為日志的一部分。
但是,我們也可以使用標准getlogger方式獲取日志記錄對象,原因是我們很可能在 celery 或者 web 應用程序中調用代碼。如果我們使用 logging.getlogger(name),可以使我們的底層代碼與執行代碼的上下文保持干凈整潔。
第一種策略:Augment Celery 記錄器
celery 提供了after_setup_logger在Celery設置記錄器之后觸發的信號,信號傳遞記錄器對象,我們可以方便地自定義處理程序然后添加到記錄器中
import os
import logging
from celery import Celery
from celery.signals import after_setup_logger
for f in ['./broker/out', './broker/processed']:
if not os.path.exists(f):
os.makedirs(f)
logger = logging.getLogger(__name__)
app = Celery('app')
app.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': './broker/out',
'data_folder_out': './broker/out',
'data_folder_processed': './broker/processed'
}})
@after_setup_logger.connect
def setup_loggers(logger, *args, **kwargs):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)
# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)
@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result
if __name__ == '__main__':
task = add.s(x=2, y=3).delay()
print(f'Started task: {task}')
第二種策略:覆蓋 celery 根記錄器
可以通過連接setup_logging 信號來阻止celery 配置任何記錄器,這樣,我們就可以完全自定義自己的日志記錄配置
import os
import logging
from celery import Celery
from celery.signals import setup_logging
app = Celery('app')
app.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': './broker/out',
'data_folder_out': './broker/out',
'data_folder_processed': './broker/processed'
}})
for f in ['./broker/out', './broker/processed']:
if not os.path.exists(f):
os.makedirs(f)
logger = logging.getLogger(__name__)
@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
logger = logging.getLogger()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)
# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)
@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result
if __name__ == '__main__':
task = add.s(x=2, y=3).delay()
print(f'Started task: {task}')
第三種策略:停用 celery 記錄器配置
另一種解決方案就是讓 celery 設計其記錄器但是不使用,並防止其劫持根記錄器。默認情況下,celery 會在根記錄器上先刪除所有先前的配置的處理程序。如果要自定義自己的日志處理程序而不會妨礙celery,則可以通過設置禁用此行為 worker_hijack_root_logger=True。這將使我們能夠收回對於根記錄器的控制權,並退回到標准的 python 記錄器設置。但是需要謹慎使用這種方案,因為我們需要確保python 日志記錄與 celery 設置完全兼容(event,greenlet,threads 等)
import os
import logging
from celery import Celery
from celery.signals import setup_logging
app = Celery('app')
app.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': './broker/out',
'data_folder_out': './broker/out',
'data_folder_processed': './broker/processed'
},
'worker_hijack_root_logger': False})
# setup folder for message broking
for f in ['./broker/out', './broker/processed']:
if not os.path.exists(f):
os.makedirs(f)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)
# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)
logger = logging.getLogger(__name__)
@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result
if __name__ == '__main__':
task = add.s(x=2, y=3).delay()
print(f'Started task: {task}')
我們采用第二種方法來定制celery 的日志格式
- 添加 logging_config
import logging.config
LOG_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'datefmt': '%Y-%m-%d %H:%M:%S',
'format': '{"timestamp": "%(asctime)s", "app": "bs-whatweb", '
'"logger": "%(name)s", "level": "%(levelname)s", '
'"pathname": "%(pathname)s", "module": "%(module)s", '
'"funcName": "%(funcName)s", "lineno": "%(lineno)d", '
'"message": "%(message)s"}'
},
'json': {
'class': 'project.api.tasks.logger.JSONFormatter'
}
},
'handlers': {
'celery': {
'level': 'INFO',
'formatter': 'simple',
'class': 'logging.StreamHandler'
},
'celery_json': {
'level': 'INFO',
'formatter': 'json',
'class': 'logging.StreamHandler'
},
'sentry': {
'level': "CRITICAL",
'formatter': 'simple',
'class': 'raven.handlers.logging.SentryHandler',
'args': ('https://facc2ededdfa45ba955dca1eb485915a@sentry.socmap.org/7',)
},
},
'loggers': {
'celery_logger': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'celery.task': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'celery.worker': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'celery': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'project': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
}
}
logging.config.dictConfig(LOG_CONFIG)
- 設置 JSON 格式化,並添加 task_id 及 task_name 兩個參數
def __init__(self, tags=None, hostname=None, fqdn=False, message_type='JSON',
indent=None):
super().__init__()
*********************新增******************
try:
from celery._state import get_current_task
self.get_current_task = get_current_task
except ImportError:
self.get_current_task = lambda: None
******************************************
"""
:param tags: a list of tags to add to every messages
:hostname: force a specific hostname
:fqdn: a boolean to use the FQDN instead of the machine's hostname
:message_type: the message type for Logstash formatters
:indent: indent level of the JSON output
"""
self.message_type = message_type
self.tags = tags if tags is not None else []
self.extra_tags = []
self.indent = indent
if hostname:
self.host = hostname
elif fqdn:
self.host = socket.getfqdn()
else:
self.host = socket.gethostname()
def format(self, record, serialize=True):
****************************新增***********************
task = self.get_current_task()
if task and task.request:
record.__dict__.update(task_id=task.request.id,
task_name=task.name)
else:
record.__dict__.setdefault('task_name', '')
record.__dict__.setdefault('task_id', '')
if record.__dict__.get("data"):
record.__dict__.pop("data")
*****************************************************
new_message = record.getMessage()
# Create message dict
message = {
'timestamp': self.format_timestamp(record.created),
'app': os.environ.get('APP_NAME'),
'host': self.host,
'environment': os.environ.get('FLASK_ENV'),
'logger': record.name,
'level': record.levelname,
'message': new_message,
'path': record.pathname,
'tags': self.tags[:]
}
# Add extra fields
message.update(self.get_extra_fields(record))
# Add extra tags
if self.extra_tags:
message['tags'].extend(self.extra_tags)
# If exception, add debug info
if record.exc_info or record.exc_text:
message.update(self.get_debug_fields(record))
if serialize:
return self.serialize(message, indent=self.indent)
return message
- 攔截 celery 信號
from celery.signals import setup_logging
@setup_logging.connect
def setup_logger(*args, **kwargs):
from project.api.tasks import logging_config
