Celery異步任務隊列
目錄結構樹:
配置文件config.py:
# 設置中間人地址 broker_url = 'redis://127.0.0.1:6379/1'
主main.py:
import sys import os from celery import Celery from flask import Flask from flask_mail import Mail CELERY_DIR = os.path.dirname(os.getcwd()) sys.path.insert(0, CELERY_DIR) import config mail = Mail() app = Flask(__name__) app.config.from_object(config.config.get(os.environ.get('env')) mail.init_app(app) def make_celery(app): # 創建celery對象並設置 celery = Celery(app.import_name) # 加載配置 celery.config_from_object('celery_tasks.config') # celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask # 啟動celery worker時自動發現任務 celery.autodiscover_tasks(['celery_tasks.email',]) return celery celery = make_celery(app)
任務函數tasks.py:
from flask_mail import Message import config from celery_tasks.main import celery, mail # 使用裝飾器將send_email函數裝飾成任務函數 @celery.task(name='send_email') def send_email(to, subject, html_message): msg = Message( subject, sender=config.Config.MAIL_USERNAME, html=html_message, recipients=[to] ) mail.send(msg) if __name__ == '__main__': send_email.delay('xx@xx.com', 'xx', 'xx')
啟動命令:
celery worker -A main.celery -l info
發出任務函數:
send_email.delay('xx@xx.com', 'xx', 'xx')