前言
Celery 是一個分布式隊列的管理工具, 可以用 Celery 提供的接口快速實現並管理一個分布式的任務隊列.
使用於生產環境的消息代理有 RabbitMQ 和 Redis,還可以使用數據庫,本篇介紹redis使用
Redis 環境搭建
Redis 是一個開源的使用 ANSI C 語言編寫、遵守 BSD 協議、支持網絡、可基於內存、分布式、可選持久性的鍵值對(Key-Value)存儲數據庫,並提供多種語言的 API
Redis 與其他 key - value 緩存產品有以下三個特點:
- Redis支持數據的持久化,可以將內存中的數據保存在磁盤中,重啟的時候可以再次加載進行使用。
- Redis不僅僅支持簡單的key-value類型的數據,同時還提供list,set,zset,hash等數據結構的存儲。
- Redis支持數據的備份,即master-slave模式的數據備份。
使用 docker 安裝Redis
docker pull redis:latest
運行容器
docker run -itd --name redis-test -p 6379:6379 redis
映射容器服務的 6379 端口到宿主機的 6379 端口。外部可以直接通過宿主機ip:6379 訪問到 Redis 的服務。
上面是沒有設置密碼的,設置密碼用下面這句
docker run -itd --name myredis -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes
django依賴包
django使用的版本是v2.1.2
安裝celery版本
pip install celery==3.1.26.post2
安裝django-celery包
pip install django-celery==3.3.1
安裝Redis
pip install redis==2.10.6
Django 中使用 Celery
要在 Django 項目中使用 Celery,您必須首先定義 Celery 庫的一個實例(稱為“應用程序”)
如果你有一個現代的 Django 項目布局,比如:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那么推薦的方法是創建一個新的proj/proj/celery.py模塊來定義 Celery 實例:
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
其中debug_task是測試的任務,可以注掉
# @app.task(bind=True)
# def debug_task(self):
# print('Request: {0!r}'.format(self.request))
上面一段只需改這句,'proj'是自己django項目的app名稱
app = Celery('proj')
然后你需要在你的proj/proj/init.py 模塊中導入這個應用程序。這確保在 Django 啟動時加載應用程序,以便@shared_task裝飾器(稍后提到)將使用它:
proj/proj/init.py:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
上面這段固定的,不用改
tasks任務
在app下新建tasks.py,必須要是tasks.py文件名稱,django會自動查找到app下的該文件
@shared_task
def add(x, y):
print("task----------1111111111111111111111")
return x + y
@shared_task
def mul(x, y):
return x * y
tasks.py可以寫任務函數add、mul,讓它生效的最直接的方法就是添加app.task 或shared_task 這個裝飾器
添加setting配置
setting.py添加配置
# celery 配置連接redis
BROKER_URL = 'redis://ip:6379'
CELERY_RESULT_BACKEND = 'redis://ip:6379'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
創建視圖
views.py創建視圖
from .tasks import add, mul
def task_demo(request):
res = add.delay(10, 20)
print(res.task_id) # 返回task_id
return JsonResponse({"code": 0, "res": res.task_id})
啟動worker
前面pip已經安裝過celery應用了,celery是一個獨立的應用,可以啟動worker
celery -A MyDjango worker -l info
其中MyDjango是你自己的django項目名稱
運行日志
-------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: yoyo:0x1ea1a96e9b0
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. yoyo.tasks.add
. yoyo.tasks.mul
[2021-10-18 22:45:03,155: INFO/MainProcess] Connected to redis://localhost:6379//
[2021-10-18 22:45:03,347: INFO/MainProcess] mingle: searching for neighbors
[2021-10-18 22:45:04,897: INFO/MainProcess] mingle: all alone
[2021-10-18 22:45:05,406: WARNING/MainProcess] e:\python36\lib\site-packages\celery\fixups\django.py:265:
UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2021-10-18 22:45:05,407: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
運行的時候,當我們看到"Connected to redis"說明已經連接成功了!
連接過程中如果出現報錯:redis celery:AttributeError: str object has no attribute items
[2021-10-18 17:15:21,801: ERROR/MainProcess] Unrecoverable error: AttributeError("'str' object has no attribute 'items'",)
Traceback (most recent call last):
File "e:\python36\lib\site-packages\celery\worker\__init__.py", line 206, in start
self.blueprint.start(self)
File "e:\python36\lib\site-packages\celery\bootsteps.py", line 123, in start
step.start(parent)
File "e:\python36\lib\site-packages\celery\bootsteps.py", line 374, in start
return self.obj.start()
File "e:\python36\lib\site-packages\celery\worker\consumer.py", line 280, in start
blueprint.start(self)
File "e:\python36\lib\site-packages\celery\bootsteps.py", line 123, in start
step.start(parent)
File "e:\python36\lib\site-packages\celery\worker\consumer.py", line 884, in start
c.loop(*c.loop_args())
File "e:\python36\lib\site-packages\celery\worker\loops.py", line 103, in synloop
connection.drain_events(timeout=2.0)
File "e:\python36\lib\site-packages\kombu\connection.py", line 288, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "e:\python36\lib\site-packages\kombu\transport\virtual\__init__.py", line 847, in drain_events
self._callbacks[queue](message)
File "e:\python36\lib\site-packages\kombu\transport\virtual\__init__.py", line 534, in _callback
self.qos.append(message, message.delivery_tag)
File "e:\python36\lib\site-packages\kombu\transport\redis.py", line 146, in append
pipe.zadd(self.unacked_index_key, delivery_tag, time()) \
File "e:\python36\lib\site-packages\redis\client.py", line 2320, in zadd
for pair in iteritems(mapping):
File "e:\python36\lib\site-packages\redis\_compat.py", line 109, in iteritems
return iter(x.items())
AttributeError: 'str' object has no attribute 'items'
redis版本問題,報錯版本redis=3.2.1,降低版本redis=2.10.6后,解決
shell交互環境
在django shell交互環境調試運行任務
D:\202107django\MyDjango>python manage.py shell
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from yoyo.tasks import add,mul
>>> from celery.result import AsyncResult
>>>
>>> res = add.delay(11, 12)
>>> res
<AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
>>> res.status
'SUCCESS'
>>>
>>> res.backend
<celery.backends.redis.RedisBackend object at 0x0000015E011C3128>
>>>
>>> res.task_id
'c5ff83a4-4840-4b36-8869-5ce6081904f1'
>>>
>>>
>>> get_task = AsyncResult(id=res.task_id)
>>> get_task
<AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
>>> get_task.get()
23
>>>
res.status是查看任務狀態
res.task_id 是獲取任務的id
根據任務的id查詢任務的執行結果AsyncResult(id=res.task_id).get()獲取
