在實際的應用場景中,如用戶注冊,用戶輸入了注冊信息后,后端保存信息到數據庫中,然后跳轉至登錄界面,這些操作用戶需要等待的時間非常短,但是如果是有耗時任務,比如對輸入的網址進行漏洞掃描,在后端處理就會花費幾分鍾的時間,不可能讓用戶等待頁面刷新幾分鍾,所以需要進行后端異步處理。之前使用的后端異步處理時Python的原生線程/進程實現,簡潔暴力,自己用的話還行,但是如果是給用戶用,就還存在一些不足,現考慮使用Celery替換掉原生線程/進程異步處理。
Celery是個Python語言實現的異步分布式任務隊列服務,除了支持即時任務,還支持定時任務,Celery有五個核心角色。
-
Task 任務
任務(Task)就是你要做的事情,例如一個注冊流程里面有很多任務,給用戶發驗證郵件就是一個任務,這種耗時的任務就可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的注冊人數,這個也可以交給Celery周期性的處理。
-
Broker 經紀人,隊列,消息傳遞者
Broker 的中文意思是經紀人,指為市場上買賣雙方提供中介服務的人。在Celery中這個角色相當於數據結構中的隊列,介於生產者和消費者之間經紀人。例如一個Web系統中,生產者是主程序,它生產任務,將任務發送給 Broker,消費者是 Worker,是專門用於執行任務的后台服務。Celery本身不提供隊列服務,一般用Redis或者RabbitMQ來實現隊列服務。
-
Worker 執行者,消費者
Worker 就是那個一直在后台執行任務的人,也成為任務的消費者,它會實時地監控隊列中有沒有任務,如果有就立即取出來執行。
-
Beat 定時任務調度器
Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。
-
Backend 執行結果
Backend 用於保存任務的執行結果,每個任務都有返回值,比如發送郵件的服務會告訴我們有沒有發送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。
接下來編寫一個簡單的python程序來學習使用Celery
首先是安裝Celery,因為我的開發平台是Windows,Celery新版是不支持Windows操作系統的,需要下載老版本的,這里參考github上Celery開發者的回答: https://github.com/celery/celery/issues/4178
下載3.1.24版本的Celery
pip3 install celery==3.1.24
此外還要下載Reids,並且啟動Redis的服務,此處百度
創建Celery實例
# task.py from celery import Celery app = Celery('task', broker='redis://localhost:6379/0')
創建任務
#task.py @app.task def send_mail(email): print("send mail to ", email) import time time.sleep(5) return "success"
默認讀者有flask基礎,另外這里使用app.task 包裝 send_email , 使其成為后台運行的任務
函數使用app.task裝飾器修飾之后,就會成為Celery中的一個Task。
啟動Worker
啟動Worker,監聽Broker中是否有任務
celery worker
可以帶參數如
celery -A task worker --loglevel=info
-A: 指定 celery 實例所在哪個模塊中,--loglevel:顯示日志等級
運行后如下
調用任務
在主程序中調用任務,調任務發送給Broker,跟開一個多線程和多進程類似,相當於是把任務丟給了Broker,主程序繼續向下執行。
from task import send_mail def register(): import time start = time.time() print("1. 插入記錄到數據庫") print("2. celery 幫我發郵件") send_mail.delay("xx@gmail.com") print("3. 告訴用戶注冊成功") print("耗時:%s 秒 " % (time.time() - start)) if __name__ == '__main__': register()
因為send_mail被app.task裝飾器修飾了,所以我們想要把任務丟給它,使用函數的 .delay方法即可
目錄結構為:
── Celery測試 ├── task.py └── user.py
運行user.py,查看運行結果為:
可知
time.sleep(5)
被丟到后台去執行了,所以花費時間這么短。如果按照正常的同步邏輯去實現,至少需要5秒鍾的時間,因為存在time.sleep(5)來模擬發送郵件。
在worker服務窗口查看日志信息
跟着大佬們的博客學習了Celery的基本操作,大部分時間去安裝環境了,淦
將Celery添加進碎遮項目會在下一篇博客中說到。
請聽下回分解 咕咕咕
安裝出現的錯誤
File "d:\python3\lib\site-packages\celery\concurrency\prefork.py", line 20, in <module> from celery.concurrency.base import BasePool File "d:\python3\lib\site-packages\celery\concurrency\base.py", line 21, in <module> from celery.utils import timer2 File "d:\python3\lib\site-packages\celery\utils\timer2.py", line 19 from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger ^ SyntaxError: invalid syntax
參考自:https://www.cnblogs.com/zivli/p/11517797.html
這個是python3.7目前不支持kombu,降低python版本至3.6即可,(又得重新裝一波python,沒裝conda嗚嗚嗚,卸載之前先把python的類庫輸出到requirements.txt文件
關於卸載python,可以參考這篇博客:https://blog.csdn.net/ke_yi_/article/details/88183474
如果電腦上是python2和python3共存,請看:https://blog.csdn.net/autista/article/details/73650943
弄好了之后再把之前python3.7的庫文件恢復到python3.6里面來
pip3 install -r requirements.txt
弄好了之后重新打開python的集成開發環境
celery -A task worker --loglevel=info
可算能運行Celery了
接着運行程序的時候又遇到了
AttributeError: 'str' object has no attribute 'items'
出現該問題的原因是redis版本過高,降低redis版本即可
pip3 install redis==2.10.6
然后就沒有遇到其他的坑了,遇到再補:D
參考鏈接