Django消息隊列之django-rq


github:https://github.com/rq/django-rq

 

RQ(Redis Queue),人如其名,用 redis 做的隊列任務

redis ,眾所周知, 它的列表可以做隊列,rq就是把job放進隊列里,然后啟worker挨個做完

另外rq極其簡單,官方文檔短小精悍,容易上手



[安裝]

pip install django-rq 

添加配置:

修改配置setting.py

INSTALLED_APPS = [
    ...
    "django_rq",
]

注意: 這里使用的下划線,
RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
        'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
    },
    'with-sentinel': {
        'SENTINELS': [('localhost', 26736), ('localhost', 26737)],
        'MASTER_NAME': 'redismaster',
        'DB': 0,
        'PASSWORD': 'secret',
        'SOCKET_TIMEOUT': None,
        'CONNECTION_KWARGS': {
            'socket_connect_timeout': 0.3
        },
    },
    'high': {
        'URL': os.getenv('REDISTOGO_URL', 'redis://localhost:6379/0'), # If you're on Heroku
        'DEFAULT_TIMEOUT': 500,
    },
    'low': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    }
}

RQ_EXCEPTION_HANDLERS = ['path.to.my.handler'] # If you need custom exception handlers



#default,high,low表示隊列的優先級,high > default > low
# 但是如果一大批low隊列的job在執行的話,此時high隊列開始入隊job,不會馬上下一個任務就開始執行high隊列,而是會繼續執行low隊列,直至low隊列任務執行完畢

支持使用django-redis和django-redis-cache

CACHES = {
    'redis-cache': {
        'BACKEND': 'redis_cache.cache.RedisCache',
        'LOCATION': 'localhost:6379:1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'MAX_ENTRIES': 5000,
        },
    },
}

RQ_QUEUES = {
    'high': {
        'USE_REDIS_CACHE': 'redis-cache',
    },
    'low': {
        'USE_REDIS_CACHE': 'redis-cache',
    },
}

添加日志配置:

LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "rq_console": {
            "format": "%(asctime)s %(message)s",
            "datefmt": "%H:%M:%S",
        },
    },
    "handlers": {
        "rq_console": {
            "level": "DEBUG",
            "class": "rq.utils.ColorizingStreamHandler",
            "formatter": "rq_console",
            "exclude": ["%(asctime)s"],
        },
        # If you use sentry for logging
        'sentry': {
            'level': 'ERROR',
            'class': 'raven.contrib.django.handlers.SentryHandler',
        },
    },
    'loggers': {
        "rq.worker": {
            "handlers": ["rq_console", "sentry"],
            "level": "DEBUG"
        },
    }
}

 

 

 

 

 

添加路由:

修改urls.py

# For Django < 2.0
urlpatterns += [
    url(r'^django-rq/', include('django_rq.urls')),
]

# For Django >= 2.0
urlpatterns += [
    path('django-rq/', include('django_rq.urls'))
# 添加后台查看
path(r'admin/django-rq/', include('django_rq.urls')), ]

使用:

1.使用@job裝飾器

 

@job('default', timeout=3600)
def long_running_func():
    pass
long_running_func.delay() # Enqueue function with a timeout of 3600 seconds.

 

實戰:

在項目目錄下添加tasks.py處理任務文件

from django_rq import job

import logging
logger = logging.getLogger('worker')

@job('default', timeout=360)
def sync_migration_record(msg):
    """同步遷移記錄"""
    logger.info(msg)

然后在任意一個視圖文件中添加任務:

from xxx.tasks import sync_migration_record

def test_task(request):
    sync_migration_record.delay("hello,rq")

 

指定任務:

python manage.py rqworker high default low

#在 high default low三個隊列各自啟動一個worker,注意了,由於django_rq調用linux中fork(),所以只能在linux系統中執行,windos可以嘗試win10的子系統

 

 

 

更多:

查看隊列執行狀況

多種辦法

1.配置好的django admin中查看,種類齊全,最佳查看方式

2.python manage.py rqstats

python manage.py rqstats --interval=1  #每秒刷新(其實刷新並不及時)

python manage.py rqstats --json  # 輸出JSON

python manage.py rqstats --yaml  # 輸出YAML

3. 進入redis庫中可以看到自己的隊列,worker,以及job(作為輔助驗證使用)

4.rq 也有命令可以啟動 和查看狀態,大家可以自己看看




 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM