Django——django-celery異步任務設置過期時間


django-celery異步任務設置過期時間

場景

在django做項目的時候,因為一些特殊的場景,所以需要用到異步操作,比如發短信,發郵件。設置了django-celery,通過redis作為中間件存儲。有一次redis意外死亡了,過了很久才有人提出來,說登錄短信接收不到,看了日志發現了問題,重啟了redis以后,手機收到了一堆的短信轟炸

分析

因為django-celery的生產消費者模型,待消費的任務隊列,沒有過期時間,所以復活的redis,將未執行的任務全部執行了。

於是我在本地檢測了一下,關掉celery以后,運行redis,執行異步操作,會在redis中你設置的數據庫中生成一個名為celery的列表(keys *),看了一下里面的內容(lrange celery 0 -1),內容結構如下:

b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "shadow": null, "eta": null, "expires": "2021-05-19T17:12:04.087801+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"}}',
 b'{"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "permission_changed", "id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "shadow": null, "eta": null, "expires": "2021-05-19T17:11:55.480755+00:00", "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "parent_id": null, "argsrepr": "(5, 1)", "kwargsrepr": "{}", "origin": "gen66816@wjh-MacBook-Pro.local"}, "properties": {"correlation_id": "075c774b-de6e-40d8-9cdc-05b97f88f0a3", "reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "34dc18c5-b960-4b29-80d6-126e76e3d161"}}'

單獨查看一條數據(lindex celery 0)

{
	"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
	"content-encoding": "utf-8",
	"content-type": "application/json",
	"headers": {
		"lang": "py",
		"task": "permission_changed",
		"id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
		"shadow": null,
		"eta": null,
		"expires": null,
		"group": null,
		"group_index": null,
		"retries": 0,
		"timelimit": [null, null],
		"root_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
		"parent_id": null,
		"argsrepr": "(5, 1)",
		"kwargsrepr": "{}",
		"origin": "gen66816@wjh-MacBook-Pro.local"
	},
	"properties": {
		"correlation_id": "6223983c-1584-4b5a-97a7-71ffc0b1dd1c",
		"reply_to": "35dd26fb-b289-353a-bdec-2c95149f4ac8",
		"delivery_mode": 2,
		"delivery_info": {
			"exchange": "",
			"routing_key": "celery"
		},
		"priority": 0,
		"body_encoding": "base64",
		"delivery_tag": "1ae73174-b8db-4dd8-9a1f-18361672c77b"
	}
}

我發現里面有一個屬性expires ,這不就是設置過期時間的嗎。
於是我開始看celery的源碼,哪里能設置這個expires屬性,切入點在.delay()方法這里,

    def delay(self, *args, **kwargs):
        return self.apply_async(args, kwargs)

再看apply_async方法,因為沒有看到expires關鍵字參數,所以我猜他在options里面,於是找了一下options的取值
看到這么一段

preopts = self._get_exec_options()
options = dict(preopts, **options) if options else preopts

再看_get_exec_options方法

    def _get_exec_options(self):
        if self._exec_options is None:
            self._exec_options = extract_exec_options(self)
        return self._exec_options

果然,在extract_exec_options中找到了redis中存儲的屬性的關鍵字由來

extract_exec_options = mattrgetter(
    'queue', 'routing_key', 'exchange', 'priority', 'expires',
    'serializer', 'delivery_mode', 'compression', 'time_limit',
    'soft_time_limit', 'immediate', 'mandatory',  # imm+man is deprecated
)

因為preopts = self._get_exec_options()的self,本身類是class Task:
所以,只要在你定義的task任務中的task裝飾器上傳入expires屬性,就可以了
至於應該傳什么樣的值,在源碼中找到這么一段(挺不好找的,amqp.py/AMQP構造方法下的task_protocols屬性)

        if isinstance(expires, numbers.Real):
            self._verify_seconds(expires, 'expires')
            now = now or self.app.now()
            timezone = timezone or self.app.timezone
            expires = maybe_make_aware(
                now + timedelta(seconds=expires), tz=timezone,
            )

很明顯,seconds是秒,所以最后expires屬性在redis中存儲的是一個准確的時間格式,這里有一個細節
又是一個小細節
時間的存儲涉及到了時區,celery的當前時間取的是self.app.now(),於是乎

        def now(self):
        """Return the current time and date as a datetime."""
        now_in_utc = to_utc(datetime.utcnow())
        return now_in_utc.astimezone(self.timezone)

WTF?竟然默認用的是UTC,所以我選擇先加上expires屬性給個5分鍾看看會不會生效以及會不會差8個小時
果然!生效了,果然,差了8個小時
又是一個小細節
怎么辦呢?我選擇重寫celerynow方法
只設置now,不管timezone真的可以嗎?答案是可以的,看一下maybe_make_aware方法

def maybe_make_aware(dt, tz=None):
    """Convert dt to aware datetime, do nothing if dt is already aware."""
    if is_naive(dt):
        dt = to_utc(dt)
        return localize(
            dt, timezone.utc if tz is None else timezone.tz_or_local(tz),
        )
    return dt

dt, timezone.utc if tz is None else timezone.tz_or_local(tz),這里如果時間有時區,就用自己的
於是乎

實現

django-celery的項目結構,網上一抓一大把,我就不寫了,先看main文件

import os
from celery import Celery

from django.utils import timezone

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SHBMCRM.settings')


class MyCelery(Celery):

    def now(self):
        return timezone.localtime()  # 使用django自帶的時間,會根據settings中設置的TIME_ZONE獲取當前時間,很方便


celery_app = MyCelery('SHBMCRM')
celery_app.config_from_object('celery_tasks.config')
celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.message'])
# 設置過期時間 60s * 5 = 5分鍾
@celery_app.task(name='new_member', expires=60*5)  
def new_member(mobile, user):
    # do somethings

效果

{
	"body": "W1s1LCAxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
	"content-encoding": "utf-8",
	"content-type": "application/json",
	"headers": {
		"eta": null,
		"expires": "2021-05-20T10:26:41.945686+08:00",   // 有過期時間了
		"group": null,
		"kwargsrepr": "{}",
		"origin": "gen66816@wjh-MacBook-Pro.local"
	},
}
第一條就是過期了
[2021-05-20 10:21:42,017: INFO/MainProcess] Received task: permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094]   expires:[2021-05-20 10:26:41.945686+08:00]
第二條執行成功
[2021-05-20 10:21:42,036: INFO/ForkPoolWorker-1] Task permission_changed[25ac8f5d-ed60-41ef-929f-9b6d85448094] succeeded in 0.01773979200000042s: None

創作不易,轉載請注明出處及附帶鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM