環境版本:
windows7 x64
django 1.11.6
django-celery 3.2.2
工程結構說明:源碼下載請訪問https://i.cnblogs.com/Files.aspx
1、新建django項目DjangoCelery(具體請參考https://www.cnblogs.com/apple2016/p/11434107.html),並啟動,啟動方式包括2種:
1)通過eclipse,右擊項目名稱,Run As----Pydev:Django
2)在manage.py文件所在路徑執行命令:python manage.py runserver 0.0.0.0:8000
本次啟動采用方式1,啟動成功后,控制台日志打印信息如下:
2、在...\testdj\testdj\testdj下新建celery配置文件celeryconfig.py:
# -*- coding: utf-8 -*- ''' Created on 2019年8月28日 @author: lenovo ''' import celery import djcelery #當djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標記為task的方法,將它們注冊為celery task。 djcelery.setup_loader() #設置不同的隊列,不要只使用默認的隊列,這樣當任務比較多的時候任務之間會相互影響(例如將普通任務和定時任務混在一起), CELERY_QUEUES={ #定時任務隊列 'beat_tasks':{ 'exchange':'beat_tasks', 'exchange_type':'direct', 'binding_key':'beat_tasks' }, 'work_queue':{ 'exchange':'work_queue', 'exchange_type':'direct', 'binding_key':'work_queue' } #普通任務隊列 } CELERY_DEFAULT_QUEUE='work_queue' #設置默認隊列,若不指定隊列則使用該隊列 CELERY_IMPORTS=( 'course.tasks', ) #參數配置可參考官網:http://docs.celeryproject.org/en/latest/userguide/configuration.html CELERY_ACKS_LATE=True #允許重試 CELERYD_FORCE_EXECV=True #可以讓Celery更加可靠,只有當worker執行完任務后,才會告訴MQ,消息被消費,防治死鎖 CELERYD_CONCURRENCY=4 #設置並發的worker數量 CELERYD_MAX_TASKS_PRE_CHILD=100 #每個worker最多執行100個任務被銷毀,可以防止內存泄露 CELERYD_TASK_TIME_LIMIT=12*30 #單個任務的最大運行時間為6分鍾,超過的話就被殺掉
3、在項目路徑...\testdj\testdj下新建app,命名為course
右擊項目名稱testdj,選擇Django---create application(mange.py startapp),輸入app名稱完成創建;
4、將新建的course這個app和djcelery模塊注冊到settings.py中:
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles',
'djcelery', 'course', ]
注冊djcelery是為了后續通過manage.py使用其相關命令 ;
5、建立Celery與django的關系,包括broker和backend設置,在settings.py中完成相關配置:
#Celery from celeryconfig import * BROKER_BACKEND='redis' BROKER_URL='redis://localhost:6379/1' CELERY_RESULT_BACKEND='redis://localhost:6379/2'
6、在...\testdj\testdj\course下新建tasks.py文件
# -*- coding: utf-8 -*- ''' Created on 2019年8月28日 @author: lenovo ''' import time from celery.task import Task class CourseTask(Task): name='course-task' #給該任務起一個名字,這樣可以用它在其他地方表示這個任務 def run(self,*args,**kwargs): print 'start course task' time.sleep(4) print 'args={},kwargs={}'.format(args, kwargs) print 'end course task'
7、新建views.py
# -*- coding: utf-8 -*- ''' Created on 2019年8月29日 @author: lenovo ''' from course.tasks import CourseTask from django.http import JsonResponse def do(request): #執行異步任務 print 'start do request' CourseTask.delay() print 'end do request' return JsonResponse({'result':'ok'})
8、修改urls.py文件:
from django.conf.urls import url from django.contrib import admin from course import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^do/$',views.do,name='do'), ]
9、啟動worker:在manage.py文件所在路徑下執行命令python manage.py celery worker -l INFO
10、打開瀏覽器訪問http://127.0.0.1:8000/do/完成請求發送,查看runserver啟動的服務日志顯示通過views.py已發送任務成功,查看worker日志顯示worker已接收到任務並進行了消費
並且我們發起請求時,
start do request
end do request
是實時打印的(而非等待4S才打印end do request),說明任務未出現阻塞情況,證明任務是異步的。
11、增加定時任務設置:修改celeryconfig.py文件,增加定時任務,並指定定時任務運行在beat_tasks隊列中:
# -*- coding: utf-8 -*- ''' Created on 2019年8月28日 @author: lenovo ''' import celery import djcelery #當djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標記為task的方法,將它們注冊為celery task。 djcelery.setup_loader() #設置不同的隊列,不要只使用默認的隊列,這樣當任務比較多的時候任務之間會相互影響(例如將普通任務和定時任務混在一起), CELERY_QUEUES={ #定時任務隊列 'beat_tasks':{ 'exchange':'beat_tasks', 'exchange_type':'direct', 'binding_key':'beat_tasks' }, 'work_queue':{ 'exchange':'work_queue', 'exchange_type':'direct', 'binding_key':'work_queue' } #普通任務隊列 } CELERY_DEFAULT_QUEUE='work_queue' #設置默認隊列,若不指定隊列則使用該隊列 CELERY_IMPORTS=( 'course.tasks', ) #參數配置可參考官網:http://docs.celeryproject.org/en/latest/userguide/configuration.html CELERY_ACKS_LATE=True #允許重試 CELERYD_FORCE_EXECV=True #可以讓Celery更加可靠,只有當worker執行完任務后,才會告訴MQ,消息被消費,防治死鎖 CELERYD_CONCURRENCY=4 #設置並發的worker數量 CELERYD_MAX_TASKS_PRE_CHILD=100 #每個worker最多執行100個任務被銷毀,可以防止內存泄露 CELERYD_TASK_TIME_LIMIT=12*30 #單個任務的最大運行時間為6分鍾,超過的話就被殺掉 #設置定時任務 from datetime import timedelta CELERYBEAT_SCHEDULE={ 'task1':{ 'task':'course-task', 'schedule':timedelta(seconds=5), 'options':{ 'queue':'beat_tasks' } } }
12、啟動beat:執行命令python manage.py celery beat -l INFO
13、查看beat日志定時任務發送情況以及worker日志任務消費情況,當前定時任務跑在beat_tasks隊列里正常
beat日志顯示按照定時任務設置要求每隔5s發送一次任務:
worker日志顯示worker成功消費了每個定時任務: