從零開始搭建django前后端分離項目 系列三(實戰之異步任務執行)


前面已經將項目環境搭建好了,下面進入實戰環節。這里挑選項目中涉及到的幾個重要的功能模塊進行講解。

celery執行異步任務和任務管理

Celery 是一個專注於實時處理和任務調度的分布式任務隊列。由於本項目進行數據分析的耗時比較長,所以采用異步方式執行任務。本項目中Broker使用redis,Result Backend使用django的數據庫,部分配置如下settings.py(具體配置見項目代碼):

import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/5'
BROKER_POOL_LIMIT = 0
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務
CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend'
# CELERY_RESULT_BACKEND = 'redis://10.39.211.198:6379/6'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 10 #  每個worker最多執行10個任務就會被銷毀,可防止內存泄露

項目中涉及到的celery任務執行成功、執行失敗、執行完成、執行被終止、執行失敗的事件和信號如下:

@task_prerun.connect
def pre_task_run(task_id, task, sender, *args, **kwargs):
    logger.info('task [{task_id}] 開始執行, taskname: {task.name}'.format(task_id=task_id, task=task))

@task_revoked.connect
def task_revoked(request,terminated,sender,expired,signal,signum):
    now=datetime.now()
    task_id=request.id
    logger.warn('task [{0}] 被停止。'.format(task_id))
    job = Job.objects.filter(task_id=task_id).first()
    if job:
        job.runtime = (now - job.create_date).seconds
        job.save()

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        job=Job.objects.filter(task_id=task_id).first()
        if job:
            channel = job.id
            print('channel:', channel)
            redis_helper = RedisHelper(channel)
            redis_helper.public('task [{0}] success。'.format(task_id))
        logger.info('task [{0}] 執行成功, success'.format(task_id))
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        job = Job.objects.filter(task_id=task_id).first()
        if job:
            channel = job.id
            print('channel:', channel)
            redis_helper = RedisHelper(channel)
            redis_helper.public('failed')
        logger.error('task [{0}] 執行失敗, reason: {1} ,einfo: {2}'.format(task_id,exc,einfo))
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        now = datetime.now()
        job = Job.objects.filter(task_id=task_id).first()
        if job:
            job.runtime = (now - job.create_date).seconds
            job.save()

獲取任務執行結果:

from celery.result import AsyncResult
res=AsyncResult(taskid).get()

終止任務:

from celery.task.control import broadcast, revoke, rate_limit,inspect
revoke(task_id, terminate=True)

celery任務啟動:

啟用事件發送:
python manage.py celery -A myproject worker -l info -E --autoscale=6,3
啟動快照相機:
python manage.py celerycam -F 10 -l info

在開發過程中發現,當異步任務中導入sklearn包時報錯 

AttributeError: 'Worker' object has no attribute '_config'

所以在項目task.py中需要添加如下代碼:

from celery.signals import worker_process_init
@worker_process_init.connect
def fix_multiprocessing(**_):
  from multiprocessing import current_process
  try:
    current_process()._config
  except AttributeError:
    current_process()._config = {'semprefix': '/mp'}

並且需要把sklearn相關包從文件開始導入移到函數內部導入,具體見項目代碼。

效果圖:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM