Python Flask后端異步處理(二)


  在實際的應用場景中,如用戶注冊,用戶輸入了注冊信息后,后端保存信息到數據庫中,然后跳轉至登錄界面,這些操作用戶需要等待的時間非常短,但是如果是有耗時任務,比如對輸入的網址進行漏洞掃描,在后端處理就會花費幾分鍾的時間,不可能讓用戶等待頁面刷新幾分鍾,所以需要進行后端異步處理。之前使用的后端異步處理時Python的原生線程/進程實現,簡潔暴力,自己用的話還行,但是如果是給用戶用,就還存在一些不足,現考慮使用Celery替換掉原生線程/進程異步處理。

Celery

  Celery是個Python語言實現的異步分布式任務隊列服務,除了支持即時任務,還支持定時任務,Celery有五個核心角色。

  • Task 任務

    任務(Task)就是你要做的事情,例如一個注冊流程里面有很多任務,給用戶發驗證郵件就是一個任務,這種耗時的任務就可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的注冊人數,這個也可以交給Celery周期性的處理。

  • Broker 經紀人,隊列,消息傳遞者

    Broker 的中文意思是經紀人,指為市場上買賣雙方提供中介服務的人。在Celery中這個角色相當於數據結構中的隊列,介於生產者和消費者之間經紀人。例如一個Web系統中,生產者是主程序,它生產任務,將任務發送給 Broker,消費者是 Worker,是專門用於執行任務的后台服務。Celery本身不提供隊列服務,一般用Redis或者RabbitMQ來實現隊列服務。

  • Worker 執行者,消費者

    Worker 就是那個一直在后台執行任務的人,也成為任務的消費者,它會實時地監控隊列中有沒有任務,如果有就立即取出來執行。

  • Beat 定時任務調度器

    Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。

  • Backend 執行結果

    Backend 用於保存任務的執行結果,每個任務都有返回值,比如發送郵件的服務會告訴我們有沒有發送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。

celery.png

  接下來編寫一個簡單的python程序來學習使用Celery

  首先是安裝Celery,因為我的開發平台是Windows,Celery新版是不支持Windows操作系統的,需要下載老版本的,這里參考github上Celery開發者的回答:  https://github.com/celery/celery/issues/4178

  下載3.1.24版本的Celery

pip3 install celery==3.1.24

  此外還要下載Reids,並且啟動Redis的服務,此處百度

創建Celery實例

# task.py
from celery import Celery
​
app = Celery('task', broker='redis://localhost:6379/0')

創建任務

#task.py
@app.task
def send_mail(email):
    print("send mail to ", email)
    import time
    time.sleep(5)
    return "success"

  默認讀者有flask基礎,另外這里使用app.task 包裝 send_email , 使其成為后台運行的任務

  函數使用app.task裝飾器修飾之后,就會成為Celery中的一個Task。

啟動Worker

  啟動Worker,監聽Broker中是否有任務

celery worker

  可以帶參數如

celery -A task worker --loglevel=info

  -A: 指定 celery 實例所在哪個模塊中,--loglevel:顯示日志等級

  運行后如下

bug5.png

調用任務

  在主程序中調用任務,調任務發送給Broker,跟開一個多線程和多進程類似,相當於是把任務丟給了Broker,主程序繼續向下執行。

from task import send_mail
​
def register():
    import time
    start = time.time()
    print("1. 插入記錄到數據庫")
    print("2. celery 幫我發郵件")
    send_mail.delay("xx@gmail.com")
    print("3. 告訴用戶注冊成功")
    print("耗時:%s 秒 " % (time.time() - start))
​
if __name__ == '__main__':
    register()

  因為send_mail被app.task裝飾器修飾了,所以我們想要把任務丟給它,使用函數的 .delay方法即可

  目錄結構為:

── Celery測試
   ├── task.py
   └── user.py

  運行user.py,查看運行結果為:

bug6.png

  可知

time.sleep(5)

  被丟到后台去執行了,所以花費時間這么短。如果按照正常的同步邏輯去實現,至少需要5秒鍾的時間,因為存在time.sleep(5)來模擬發送郵件。

  在worker服務窗口查看日志信息

bug7.png

  跟着大佬們的博客學習了Celery的基本操作,大部分時間去安裝環境了,淦

  將Celery添加進碎遮項目會在下一篇博客中說到。

  請聽下回分解 咕咕咕

安裝出現的錯誤

File "d:\python3\lib\site-packages\celery\concurrency\prefork.py", line 20, in <module> from celery.concurrency.base import BasePool File "d:\python3\lib\site-packages\celery\concurrency\base.py", line 21, in <module> from celery.utils import timer2 File "d:\python3\lib\site-packages\celery\utils\timer2.py", line 19 from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger ^ SyntaxError: invalid syntax

bug1.png

  參考自:https://www.cnblogs.com/zivli/p/11517797.html

  這個是python3.7目前不支持kombu,降低python版本至3.6即可,(又得重新裝一波python,沒裝conda嗚嗚嗚,卸載之前先把python的類庫輸出到requirements.txt文件

  關於卸載python,可以參考這篇博客:https://blog.csdn.net/ke_yi_/article/details/88183474

  如果電腦上是python2和python3共存,請看:https://blog.csdn.net/autista/article/details/73650943

  弄好了之后再把之前python3.7的庫文件恢復到python3.6里面來

pip3 install -r requirements.txt

  弄好了之后重新打開python的集成開發環境

celery -A task worker --loglevel=info

bug2.png

  可算能運行Celery了

  接着運行程序的時候又遇到了

AttributeError: 'str' object has no attribute 'items'

bug3.png

  出現該問題的原因是redis版本過高,降低redis版本即可

pip3 install redis==2.10.6

  然后就沒有遇到其他的坑了,遇到再補:D

參考鏈接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM