django-celery異步任務設置過期時間
場景
在django做項目的時候,因為一些特殊的場景,所以需要用到異步操作,比如發短信,發郵件。設置了django-celery,通過redis作為中間件存儲。有一次redis意外死亡了,過了很久才有人提出來,說登錄短信接收不到,看了日志發現了問題,重啟了redis以后,手機收到了一堆的短信轟炸
分析
因為django-celery的生產消費者模型,待消費的任務隊列,沒有過期時間,所以復活的redis,將未執行的任務全部執行了。
於是我在本地檢測了一下,關掉celery以后,運行redis,執行異步操作,會在redis中你設置的數據庫中生成一個名為celery的列表(keys *),看了一下里面的內容(lrange celery 0 -1),內容結構如下:
b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "shadow": null, "eta": null, "expires": "2021-05-19T17:12:04.087801+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"}}',
b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "shadow": null, "eta": null, "expires": "2021-05-19T17:11:55.480755+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "34dc18c5-b960-4b29-80d6-126e76e3d161"}}'
單獨查看一條數據(lindex celery 0)
{
"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {
"lang": "py",
"task": "permission_changed",
"id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
"shadow": null,
"eta": null,
"expires": null,
"group": null,
"group_index": null,
"retries": 0,
"timelimit": [null, null],
"root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
"parent_id": null,
"argsrepr": "(5, 1)",
"kwargsrepr": "{}",
"origin": "gen66816@wjh-MacBook-Pro.local"
},
"properties": {
"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
"reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8",
"delivery_mode": 2,
"delivery_info": {
"exchange": "",
"routing_key": "celery"
},
"priority": 0,
"body_encoding": "base64",
"delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"
}
}
我發現里面有一個屬性expires ,這不就是設置過期時間的嗎。
於是我開始看celery的源碼,哪里能設置這個expires屬性,切入點在.delay()方法這里,
def delay(self, *args, **kwargs):
return self.apply_async(args, kwargs)
再看apply_async方法,因為沒有看到expires關鍵字參數,所以我猜他在options里面,於是找了一下options的取值
看到這么一段
preopts = self._get_exec_options()
options = dict(preopts, **options) if options else preopts
再看_get_exec_options方法
def _get_exec_options(self):
if self._exec_options is None:
self._exec_options = extract_exec_options(self)
return self._exec_options
果然,在extract_exec_options中找到了redis中存儲的屬性的關鍵字由來
extract_exec_options = mattrgetter(
'queue', 'routing_key', 'exchange', 'priority', 'expires',
'serializer', 'delivery_mode', 'compression', 'time_limit',
'soft_time_limit', 'immediate', 'mandatory', # imm+man is deprecated
)
因為preopts = self._get_exec_options()的self,本身類是class Task:
所以,只要在你定義的task任務中的task裝飾器上傳入expires屬性,就可以了
至於應該傳什么樣的值,在源碼中找到這么一段(挺不好找的,amqp.py/AMQP構造方法下的task_protocols屬性)
if isinstance(expires, numbers.Real):
self._verify_seconds(expires, 'expires')
now = now or self.app.now()
timezone = timezone or self.app.timezone
expires = maybe_make_aware(
now + timedelta(seconds=expires), tz=timezone,
)
很明顯,seconds是秒,所以最后expires屬性在redis中存儲的是一個准確的時間格式,這里有一個細節

時間的存儲涉及到了時區,celery的當前時間取的是self.app.now(),於是乎
def now(self):
"""Return the current time and date as a datetime."""
now_in_utc = to_utc(datetime.utcnow())
return now_in_utc.astimezone(self.timezone)
WTF?竟然默認用的是UTC,所以我選擇先加上expires屬性給個5分鍾看看會不會生效以及會不會差8個小時
果然!生效了,果然,差了8個小時

怎么辦呢?我選擇重寫celery的now方法
只設置now,不管timezone真的可以嗎?答案是可以的,看一下maybe_make_aware方法
def maybe_make_aware(dt, tz=None):
"""Convert dt to aware datetime, do nothing if dt is already aware."""
if is_naive(dt):
dt = to_utc(dt)
return localize(
dt, timezone.utc if tz is None else timezone.tz_or_local(tz),
)
return dt
dt, timezone.utc if tz is None else timezone.tz_or_local(tz),這里如果時間有時區,就用自己的
於是乎
實現
django-celery的項目結構,網上一抓一大把,我就不寫了,先看main文件
import os
from celery import Celery
from django.utils import timezone
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SHBMCRM.settings')
class MyCelery(Celery):
def now(self):
return timezone.localtime() # 使用django自帶的時間,會根據settings中設置的TIME_ZONE獲取當前時間,很方便
celery_app = MyCelery('SHBMCRM')
celery_app.config_from_object('celery_tasks.config')
celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.message'])
# 設置過期時間 60s * 5 = 5分鍾
@celery_app.task(name='new_member', expires=60*5)
def new_member(mobile, user):
# do somethings
效果
{
"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {
"eta": null,
"expires": "2021-05-20T10:26:41.945686+08:00", // 有過期時間了
"group": null,
"kwargsrepr": "{}",
"origin": "gen66816@wjh-MacBook-Pro.local"
},
}
第一條就是過期了
[2021-05-20 10:21:42,017: INFO/MainProcess] Received task: permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] expires:[2021-05-20 10:26:41.945686+08:00]
第二條執行成功
[2021-05-20 10:21:42,036: INFO/ForkPoolWorker-1] Task permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] succeeded in 0.01773979200000042s: None
