python之celery異步處理


  在寫個人博客項目的時候,涉及到郵件驗證碼和短信驗證碼的發送的功能,應該異步地去執行,而不是同步形成阻塞。這時就要用到python的擴展庫celery。

  celery是一個強大的分布式任務隊列的異步處理組件,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。

  Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件:

  Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等

任務執行單元:

  Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。

任務結果存儲:

  Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等

  個人理解,有些功能,比如發短信通知,是一個耗時任務,如果都扎堆在一起去執行,同步操作一次執行一個,一次發一條短信,勢必會形成阻塞。例如美團外賣,中午12點整的下單人數100個,每個人下單完成后系統都要發個短信給用戶說“下單成功”,每次發短信時間假如是1s,那就要100s發完這100條短信,那第101個人下單完成后要等100秒才能收到短信,這是什么用戶體驗?很顯然,我們不能一次發一條短信,而應該並發,一次發N條。這些並發技術是比較底層的,寫起來很麻煩,而celery就幫我們封裝好了這些並發技術,我們要做的就是通過celery去調動執行並發。

使用場景:

  異步任務:將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等

  定時任務:定時執行某件事情,比如每天數據統計

 

之前在cms后台修改郵箱的時候點擊發送驗證碼郵件,會有大概1.2s的延時然后彈出發送成功的對話框。這個延時就是調用flask擴展mail發送郵件的耗時,這時在同步場景下執行的。現在換成celery的異步操作:

 1 from celery import Celery
 2 from flask_mail import Message
 3 from exts import mail
 4 from flask import Flask
 5 import config
 6 app = Flask(__name__)
 7 app.config.from_object(config)
 8 mail.init_app(app) # 初始化app
 9 
10 def make_celery(app):
11     celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],broker=app.config['CELERY_BROKER_URL'])
12     celery.conf.update(app.config)
13     TaskBase = celery.Task
14     class ContextTask(TaskBase):
15         abstract = True
16         def __call__(self, *args, **kwargs):
17             with app.app_context():
18                 return TaskBase.__call__(self, *args, **kwargs)
19     celery.Task = ContextTask
20     return celery
21 
22 celery = make_celery(app)
23 
24 @celery.task
25 def send_mail(subject,recipients,body):
26     message = Message(subject=subject,recipients=recipients,body=body)
27     mail.send(message)

 

之后點擊發送驗證碼就是立即彈出alert了。用戶體驗極佳。

 

 

這里要注意,目前我這個項目運行在window10上,但是celery對windows不太友好,4.x版本后不支持windows,要用eventlet去解決。eventlet是Python的並發網絡庫。

在命令行開啟:celery -A tasks.celery worker --pool=eventlet --loglevel=info

后來發現一定幾率出現莫名的報錯。

將eventlet改為solo就可以了:

celery -A tasks.celery worker --pool=solo --loglevel=info

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM