python3+celery+redis實現異步任務


一、原理

Celery是基於Python開發的一個分布式任務隊列框架,支持使用任務隊列的方式在分布的機器/進程/線程上執行任務調度。它是Python寫的庫,但是它實現的通訊協議也可以使用ruby,php,javascript等調用。異步任務除了消息隊列的后台執行的方式,還是一種則是定時計划任務。

Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。它的架構組成如下圖 

 

 

 

組件:

1、任務(tasks)--用戶定義的函數,用於實現用戶的功能,比如執行一個耗時很長的任務

2、中間介(Broker)--用於存放tasks的地方,但是這個中間介需要解決一個問題,就是可能需要存放非常非常多的tasks,而且要保證Worker能夠從這里拿取

3、執行者(Worker)--用於執行tasks,也就是真正調用我們在tasks中定義的函數

4、存儲(Backend)--把執行tasks返回的結果進行存儲,以供用戶查看或調用

二、實現過程

1.環境安裝(RabbitMQ/Redis、Celery、django-celery、flower)

我的python版本:3.7,celery版本:3.1.26.post2

2.創建工程

 

 

 紅圈為本工程所需:

web_order下面需要修改的文件:celery.py、__init__.py、settings文件

web_test下面需要修改的文件:tasks.py文件、longTask.py文件

 

3.修改文件過程

1)修改settings.py。在settings的最后加上如下代碼:

1 # CELERY SETTING
2 BROKER_URL = 'redis://localhost:6379'    #指定消息中間件
3 CELERY_RESULT_BACKEND = 'redis://localhost:6379'   ##指定結果存儲位置為本地數據庫
4 CELERY_ACCEPT_CONTENT = ['application/json']  #
5 CELERY_TASK_SERIALIZER = 'json'
6 CELERY_RESULT_SERIALIZER = 'json'
7 CELERY_TIMEZONE = 'Asia/Shanghai'
8 CELERY_IMPORTS = ("web_test.tasks")   #注冊任務,

###在
INSTALLED_APPS中注冊 djcelery
 
        

2)__init__ 文件。

#絕對導入,以免celery和標准庫中的celery模塊沖突
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.

#以下導入時為了確保在Django啟動時加載app,shared_task在app中會使用到
from .celery import app as celery_app

__all__ = ['celery_app']

3)celery文件

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "web_order.settings")  # 設置celery可以在命令行中使用
app = Celery('web_order')  # 創建app實例
# app = Celery('tcelery', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.conf.CELERY_IGNORE_RESULT = False  # 結果不忽略
# app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #結果保存在redis中

app.config_from_object('django.conf:settings')  # 從文件中加載實例
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 自動加載tasks,注意:他會去app下面查找tasks.py文件,所以我們必須將task放在tasks.py文件中

4)longTasks.py

from django.http import JsonResponse
import json
from .tasks import test   #導入異步任務的方法


def sendlongtask(request):
    #由此處調用test方法,執行異步命令
    run_res = test(sh, userIp, username)
    return JsonResponse("執行成功", safe=False)

5)Tasks.py

from celery import shared_task

@shared_task
def test(x, y):
    return (x+y)

4.啟動Django和celery

  在項目根目錄執行:

  python manage.py runserver 0.0.0.0:8000

  python manage.py celery worker -c 4 --loglevel=info

 

5.另外,Celery提供了一個工具flower,將各個任務的執行情況、各個worker的健康狀態進行監控並以可視化的方式展現,如下圖所示:

 

 

 

 

    Django下實現的方式如下: 

      1.) 安裝flower:

    pip install flower

      2.) 啟動flower(默認會啟動一個webserver,端口為5555):

python manage.py celery flower

     3.) 進入http://localhost:5555即可查看。

6.python3踩過的坑:python3.7與celery不兼容

 

 1.出現這個錯誤時,需要將報錯文件中所有的async改為asynchronous或者其他變量名即可。

運行后,你會發現celery可以正常使用了

2.將python版本降到3.6及以下,celery也可正常使用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM