flask中使用celery1--簡單實例


之前的博客

之前寫過一篇celery的博客:任務分發系統Celery

簡單的demo代碼

項目結構如下:

代碼如下:

# -*- coding:utf-8 -*-
import uuid
import time

from celery import Celery
from flask import Flask,request,jsonify


# 注意這里Flask的實例名稱不能叫app,否則會報錯!
flask_app = Flask(__name__)
flask_app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/1"
flask_app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/2"

# celery實例
celery_ins = Celery(flask_app.name,broker=flask_app.config["CELERY_BROKER_URL"])
celery_ins.conf.update(flask_app.config)

# 模擬處理發送郵件的函數
def do_send_email(to,content):
    start = time.time()
    time.sleep(3)
    print(f"測試發郵件的處理過程...耗時 {time.time() - start} s")

# 異步任務
@celery_ins.task
def send_email(to,content):
    do_send_email(to,content)
    return "send email successfully!"

# Flask視圖函數 —— 在里面調用異步任務
@flask_app.route("/pwd/forgot",methods=["POST"])
def reset_pwd():
    args = request.args
    email = args["email"]
    token = str(uuid.uuid4())
    content = u'請點擊鏈接重置密碼:http://example.com/password/reset/?token=%s' % token
    # 調用異步任務
    send_email.delay(email,content)
    return jsonify(code=0,msg="郵件發送成功!")

if __name__ == '__main__':
    flask_app.run(debug=True,host="127.0.0.1",port=9000)

運行celery與flask項目

運行celery

進入項目的目錄:

執行celery命令:

啟動flask項目

 

在postman中模擬

結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM