[Flask]celery異步任務隊列的使用


Celery異步任務隊列

目錄結構樹:

 

配置文件config.py:

# 設置中間人地址
broker_url = 'redis://127.0.0.1:6379/1'

  

主main.py:

import sys
import os

from celery import Celery

from flask import Flask
from flask_mail import Mail

CELERY_DIR = os.path.dirname(os.getcwd())
sys.path.insert(0, CELERY_DIR)
import config

mail = Mail()

app = Flask(__name__)
app.config.from_object(config.config.get(os.environ.get('env'))
mail.init_app(app)


def make_celery(app):
    # 創建celery對象並設置
    celery = Celery(app.import_name)
    # 加載配置
    celery.config_from_object('celery_tasks.config')
    # celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    # 啟動celery worker時自動發現任務
    celery.autodiscover_tasks(['celery_tasks.email',])
    return celery

celery = make_celery(app)

  

任務函數tasks.py:

from flask_mail import Message

import config
from celery_tasks.main import celery, mail


# 使用裝飾器將send_email函數裝飾成任務函數
@celery.task(name='send_email')
def send_email(to, subject, html_message):

    msg = Message(
        subject,
        sender=config.Config.MAIL_USERNAME,
        html=html_message,
        recipients=[to]
    )
    mail.send(msg)


if __name__ == '__main__':
    send_email.delay('xx@xx.com', 'xx', 'xx')

  

啟動命令:

celery worker -A main.celery -l info

  

發出任務函數:

send_email.delay('xx@xx.com', 'xx', 'xx')

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM