異步任務(Celery)詳解


一、背景

在開發中,我們可能經常會遇到一些需要執行時間很長的任務,如果放在前端,會讓用戶一直卡在那兒等待或者一直轉圈圈,體驗非常不好。為了改善這種體驗,我趕緊上網搜索,果然,前人早已有解決辦法了。那就是異步。在Django中,我們可以使用celery異步框架,我們可以把耗時的任務扔到后台,而前端給用戶立即返回,待用戶需要查看結果時,點擊查看即可,並且可以隨時看到任務執行的狀態。

二、原理

Celery是基於Python開發的一個分布式任務隊列框架,支持使用任務隊列的方式在分布的機器/進程/線程上執行任務調度。它是Python寫的庫,但是它實現的通訊協議也可以使用ruby,php,javascript等調用。異步任務除了消息隊列的后台執行的方式,還是一種則是定時計划任務。

Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。它的架構組成如下圖 

組件:

1、任務(tasks)--用戶定義的函數,用於實現用戶的功能,比如執行一個耗時很長的任務

2、中間介(Broker)--用於存放tasks的地方,但是這個中間介需要解決一個問題,就是可能需要存放非常非常多的tasks,而且要保證Worker能夠從這里拿取

3、執行者(Worker)--用於執行tasks,也就是真正調用我們在tasks中定義的函數

4、存儲(Backend)--把執行tasks返回的結果進行存儲,以供用戶查看或調用

 

三、實現

1、各模塊功能

Celery中,以上組件具體功能如下:

任務模塊 Task

包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往任務隊列,而定時任務由 Celery Beat 進程周期性地將任務發往任務隊列。

消息中間件 Broker

Broker,即為任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。

任務執行單元 Worker

Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。

任務結果存儲 Backend

Backend 用於存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, Redis 和 MongoDB 等。

2、實現步驟

使用 Celery 實現異步任務主要包含三個步驟:

  • 創建一個 Celery 實例 
  • 啟動 Celery Worker 
  • 應用程序調用異步任務

3、操作流程

既然我們已經知道原理和實現步驟,那么就簡單了,開搞吧。以下步驟基本上是按照celery官網最佳實踐來操作的。

相關鏈接:http://docs.jinkan.org/docs/celery/django/first-steps-with-django.html

 

a、環境安裝(RabbitMQ/Redis、Celery、django-celery、flower)

b、創建工程(工程:tcelery、應用:app01)

請注意:這個工程目錄是適合於大的工程,小的工程可以直接把tasks放在celery.py文件中。我們大多數tasks都是位於app中,而且app一般不止一個,基本上都會有多個。

 

 c、新建文件

celery下面需要修改的文件:celery.py、__init__.py、settings文件

app01下面需要修改的文件:tasks.py文件

 

d、修改過程

1、修改settings文件,新增如下配置:

import djcelery    #導入包
djcelery.setup_loader() #加載tasks
BROKER_URL = 'redis://127.0.0.1:6379/0'  #指定broker
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' #指定結果存儲位置為本地數據庫
#CELERY_RESULT_BACKEND = 'redis://' #指定結果存儲位置為redis
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  #指定計划任務為本地數據庫配置的
#CELERY_RESULT_BACKEND = 'redis://'  #指定結果存放位置

  

2、__init__.py文件

#絕對導入,以免celery和標准庫中的celery模塊沖突
from __future__ import absolute_import

#以下導入時為了確保在Django啟動時加載app,shared_task在app中會使用到
from .celery import app as celery_app

  

3、celery文件

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tcelery.settings")  #設置celery可以在命令行中使用
app = Celery('tcelery', backend='amqp://guest@localhost//', broker='redis://localhost:6379/0')  #創建app實例,並指定backend和broker均為rabbitMQ
#app = Celery('tcelery', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.conf.CELERY_IGNORE_RESULT = False   #結果不忽略
#app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #結果保存在redis中

app.config_from_object('django.conf:settings')  #從文件中加載實例
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)  #自動加載tasks,注意:他會去app下面查找tasks.py文件,所以我們必須將task放在tasks.py文件中

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

  

4、tasks.py

from tcelery import celery_app

@celery_app.task
def test(x,y):
    return x+y

  

5、settings文件

注意:前面settings文件已經修改過,這里再次提到,是需要把app和django-celery注冊進入app

INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    "app01", 
    "djcelery",
)

  

以上配置修改完成后,我們按照如下方式啟動django、woker、flower。

 

 

 

 

 

4、驗證

a、命令行調用

 

 b、woker執行

 

c、backend保存結果

 

d、flower結果查看

同樣,我們也可以將結果保存在redis里面。

 

四、實操

1、效果

上面已經知道了原理和使用,那么下面就來進行實操吧,實操會讓你感受celery的真正使用場景。

場景:模擬后台執行一個耗時的任務(一個加法的任務),然后通過前端查詢執行結果。

效果:

模擬一個加法的任務,用戶點擊“運行”后,我們把這個任務放到后台運行,通過sleep(10)來模擬耗時任務,然后通過點擊“查看任務”查看執行的情況。

 

再次查看執行情況:

 

2、配置

 基本配置上面已經具備了,下面只說修改的幾個地方:

tasks.py

@celery_app.task
def test(x,y):
    """
    通過sleep來模擬需要執行很長時間的任務。
    :param x:
    :param y:
    :return:
    """
    sleep(10)
    return x+y

  

 views.py文件

#coding:utf-8
from django.shortcuts import render,HttpResponse,render_to_response
from models import Add
from .tasks import test,get_task_status
import datetime
import redis
import json
import time
# Create your views here.

def index(request):
    return  render_to_response('index.html')

def add_1(request):
    try:
        first = int(request.GET.get('first'))
    except:
        first = 0
    try:
        second =int(request.GET.get('second'))
    except:
        second = 0
    result = test.apply_async(args=(first,second))
    dd = Add(task_id=result.id,first=first,second=second,log_date=datetime.datetime.now())
    dd.save()
    return render_to_response('index.html')

# 任務結果
def results(request):
    #查詢所有的任務信息
    start_time = time.time()
    new_result = {}
    rt_list = []
    rows = Add.objects.all()
    for r in rows:
        status,result = get_status_id(r.task_id)
        new_result["task_id"] = r.task_id
        new_result["first"] = r.first
        new_result["second"] = r.second
        new_result["log_date"] = r.log_date
        new_result["status"] = status
        new_result["result"] = result
        rt_list.append(new_result)
        new_result = {}
    end_time = time.time()
    rt = end_time - start_time
    print rt
    return render_to_response('result.html',{'rows':rt_list})


def get_status_id(task_id):
    """
    :param task_id:
    :return:
    坑:host填寫主機名時,會耗時非常多,可以通過time獲取,大概一次要1s
    task測試:這里
    """
    pool = redis.ConnectionPool(host='127.0.0.1',port=6379,db=0)
    r = redis.Redis(connection_pool=pool)
    task_id = 'celery-task-meta-'+task_id
    #start_time = time.time()
    try:
        status = json.loads(r.get(task_id)).get("status")
        result = json.loads(r.get(task_id)).get("result")
    except:
        status = 'Executing...'
        result = 0
    #end_time = time.time()
    #print 'time:%s' %(end_time-start_time)
    print status,result
    return status,result

  

 

五、總結

從原理和實現過程來看,celery的設計非常優秀,尤其是各模塊的耦合,比如broker我們既可以使用redis、也可以使用rabbitMQ。

backend也一樣支持很多種方式。

六、坑

1、redis執行時間慢

在本次試驗的過程中遇到一個坑,通過python連接redis的時候,剛開始使用的是主機名:

pool = redis.ConnectionPool(host='localhost',port=6379,db=0)

  發現redis執行時間非常常,查詢一條記錄需要1s左右,查了好久沒找到原因。

后來把主機名改為ip后,發現非常快:

 

更多詳細內容請參閱celery官網。

http://docs.jinkan.org/docs/celery/index.html

 

2、celery:Unrecoverable error: AttributeError("'unicode' object has no attribute 'iteritems')

由於項目使用的django版本比較老,python2.7+django1.7下使用celery異步處理耗時請求。

錯誤提示:Unrecoverable error: AttributeError("'unicode' object has no attribute 'iteritems')

 

在stackoverflow中發現了解決辦法,地址:stack overflowcelery-github
問題的症結是redis的版本號為3.0以上,導致celery將其作為消息中間件的時候出現問題,給出的解決方案是安裝3.0以下的redis版本。這里我們安裝redis==2.10.6

pip install redis==2.10.6

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM