前面已經將項目環境搭建好了,下面進入實戰環節。這里挑選項目中涉及到的幾個重要的功能模塊進行講解。
celery執行異步任務和任務管理
Celery 是一個專注於實時處理和任務調度的分布式任務隊列。由於本項目進行數據分析的耗時比較長,所以采用異步方式執行任務。本項目中Broker使用redis,Result Backend使用django的數據庫,部分配置如下settings.py(具體配置見項目代碼):
import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/5' BROKER_POOL_LIMIT = 0 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務 CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend' # CELERY_RESULT_BACKEND = 'redis://10.39.211.198:6379/6' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE='Asia/Shanghai' CELERY_ENABLE_UTC = True CELERYD_CONCURRENCY = 10 CELERYD_MAX_TASKS_PER_CHILD = 10 # 每個worker最多執行10個任務就會被銷毀,可防止內存泄露
項目中涉及到的celery任務執行成功、執行失敗、執行完成、執行被終止、執行失敗的事件和信號如下:
@task_prerun.connect def pre_task_run(task_id, task, sender, *args, **kwargs): logger.info('task [{task_id}] 開始執行, taskname: {task.name}'.format(task_id=task_id, task=task)) @task_revoked.connect def task_revoked(request,terminated,sender,expired,signal,signum): now=datetime.now() task_id=request.id logger.warn('task [{0}] 被停止。'.format(task_id)) job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save() class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): job=Job.objects.filter(task_id=task_id).first() if job: channel = job.id print('channel:', channel) redis_helper = RedisHelper(channel) redis_helper.public('task [{0}] success。'.format(task_id)) logger.info('task [{0}] 執行成功, success'.format(task_id)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): job = Job.objects.filter(task_id=task_id).first() if job: channel = job.id print('channel:', channel) redis_helper = RedisHelper(channel) redis_helper.public('failed') logger.error('task [{0}] 執行失敗, reason: {1} ,einfo: {2}'.format(task_id,exc,einfo)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): now = datetime.now() job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save()
獲取任務執行結果:
from celery.result import AsyncResult res=AsyncResult(taskid).get()
終止任務:
from celery.task.control import broadcast, revoke, rate_limit,inspect revoke(task_id, terminate=True)
celery任務啟動:
啟用事件發送: python manage.py celery -A myproject worker -l info -E --autoscale=6,3 啟動快照相機: python manage.py celerycam -F 10 -l info
在開發過程中發現,當異步任務中導入sklearn包時報錯
AttributeError: 'Worker' object has no attribute '_config'
所以在項目task.py中需要添加如下代碼:
from celery.signals import worker_process_init @worker_process_init.connect def fix_multiprocessing(**_): from multiprocessing import current_process try: current_process()._config except AttributeError: current_process()._config = {'semprefix': '/mp'}
並且需要把sklearn相關包從文件開始導入移到函數內部導入,具體見項目代碼。
效果圖:


