github:https://github.com/rq/django-rq
RQ(Redis Queue),人如其名,用 redis 做的隊列任務
redis ,眾所周知, 它的列表可以做隊列,rq就是把job放進隊列里,然后啟worker挨個做完
另外rq極其簡單,官方文檔短小精悍,容易上手
[安裝]
pip install django-rq
添加配置:
修改配置setting.py INSTALLED_APPS = [ ... "django_rq", ] 注意: 這里使用的下划線, RQ_QUEUES = { 'default': { 'HOST': 'localhost', 'PORT': 6379, 'DB': 0, 'PASSWORD': 'some-password', 'DEFAULT_TIMEOUT': 360, }, 'with-sentinel': { 'SENTINELS': [('localhost', 26736), ('localhost', 26737)], 'MASTER_NAME': 'redismaster', 'DB': 0, 'PASSWORD': 'secret', 'SOCKET_TIMEOUT': None, 'CONNECTION_KWARGS': { 'socket_connect_timeout': 0.3 }, }, 'high': { 'URL': os.getenv('REDISTOGO_URL', 'redis://localhost:6379/0'), # If you're on Heroku 'DEFAULT_TIMEOUT': 500, }, 'low': { 'HOST': 'localhost', 'PORT': 6379, 'DB': 0, } } RQ_EXCEPTION_HANDLERS = ['path.to.my.handler'] # If you need custom exception handlers
#default,high,low表示隊列的優先級,high > default > low
# 但是如果一大批low隊列的job在執行的話,此時high隊列開始入隊job,不會馬上下一個任務就開始執行high隊列,而是會繼續執行low隊列,直至low隊列任務執行完畢
支持使用django-redis和django-redis-cache
CACHES = { 'redis-cache': { 'BACKEND': 'redis_cache.cache.RedisCache', 'LOCATION': 'localhost:6379:1', 'OPTIONS': { 'CLIENT_CLASS': 'django_redis.client.DefaultClient', 'MAX_ENTRIES': 5000, }, }, } RQ_QUEUES = { 'high': { 'USE_REDIS_CACHE': 'redis-cache', }, 'low': { 'USE_REDIS_CACHE': 'redis-cache', }, }
添加日志配置:
LOGGING = { "version": 1, "disable_existing_loggers": False, "formatters": { "rq_console": { "format": "%(asctime)s %(message)s", "datefmt": "%H:%M:%S", }, }, "handlers": { "rq_console": { "level": "DEBUG", "class": "rq.utils.ColorizingStreamHandler", "formatter": "rq_console", "exclude": ["%(asctime)s"], }, # If you use sentry for logging 'sentry': { 'level': 'ERROR', 'class': 'raven.contrib.django.handlers.SentryHandler', }, }, 'loggers': { "rq.worker": { "handlers": ["rq_console", "sentry"], "level": "DEBUG" }, } }
添加路由:
修改urls.py # For Django < 2.0 urlpatterns += [ url(r'^django-rq/', include('django_rq.urls')), ] # For Django >= 2.0 urlpatterns += [ path('django-rq/', include('django_rq.urls'))
# 添加后台查看
path(r'admin/django-rq/', include('django_rq.urls')), ]
使用:
1.使用@job裝飾器
@job('default', timeout=3600) def long_running_func(): pass long_running_func.delay() # Enqueue function with a timeout of 3600 seconds.
實戰:
在項目目錄下添加tasks.py處理任務文件
from django_rq import job import logging logger = logging.getLogger('worker') @job('default', timeout=360) def sync_migration_record(msg): """同步遷移記錄""" logger.info(msg)
然后在任意一個視圖文件中添加任務:
from xxx.tasks import sync_migration_record def test_task(request): sync_migration_record.delay("hello,rq")
指定任務:
python manage.py rqworker high default low #在 high default low三個隊列各自啟動一個worker,注意了,由於django_rq調用linux中fork(),所以只能在linux系統中執行,windos可以嘗試win10的子系統
更多:
查看隊列執行狀況
多種辦法
1.配置好的django admin中查看,種類齊全,最佳查看方式
2.python manage.py rqstats
python manage.py rqstats --interval=1 #每秒刷新(其實刷新並不及時)
python manage.py rqstats --json # 輸出JSON
python manage.py rqstats --yaml # 輸出YAML
3. 進入redis庫中可以看到自己的隊列,worker,以及job(作為輔助驗證使用)
4.rq 也有命令可以啟動 和查看狀態,大家可以自己看看