有業務線提出需求:要求對於其流量,只能在0點到7點掃描。
對此,celery發送任務到隊列時可以指定執行的時間。
當worker收到任務后,判斷還未到執行時間,會存儲在worker中,在到達時候后再執行。
如果還未執行就中斷worker,則任務會重新打回celery隊列中,不擔心丟失。
所以只需要傳入time格式的具體執行時間就行。
Demo
import datetime
def in_run_time(start, end):
"""
用來給任務判斷,在不在可執行的時間里,是不是需要丟到定時里
Args:
start: 任務開始執行的時間,格式如 "00:00:00"
end: 任務停止執行的時間,格式如 "07:00:00"
Returns:
"""
current_date = str(datetime.datetime.now().date()) + " "
start_time = datetime.datetime.strptime(current_date + start, '%Y-%m-%d %H:%M:%S')
end_time = datetime.datetime.strptime(current_date + end, '%Y-%m-%d %H:%M:%S')
current_date = datetime.datetime.now()
if (start_time < current_date) and (current_date < end_time):
return True
else:
return False
def get_nextday_run_time(start, end):
"""
根據當前時間,和起止時間,得出該任務應該執行的時間。
Args:
start: 任務開始執行的時間,格式如 "00:00:00"
end: 任務停止執行的時間,格式如 "07:00:00"
Returns:
"""
current_date = datetime.datetime.now().date()
end_time = datetime.datetime.strptime(str(current_date) + " " + end, '%Y-%m-%d %H:%M:%S')
current_time = datetime.datetime.now()
# 如果現在還沒到今天的執行時間,那么任務放到今天的執行時間來執行
if current_time > end_time:
current_date += datetime.timedelta(days=1)
run_time_str = str(current_date) + " " + start
# 執行-北京時間
run_time = datetime.datetime.strptime(run_time_str, '%Y-%m-%d %H:%M:%S')
run_time = run_time + datetime.timedelta(hours=-8)
return run_time
def get_run_time_by_bj_time(bj_time):
"""
將時間格式字符串轉換為datetime格式
Args:
bj_time: 指定執行時間 type-str 如 "2019-08-21 13:21:00"
Returns:
"""
run_time = datetime.datetime.strptime(bj_time, '%Y-%m-%d %H:%M:%S')
run_time = run_time + datetime.timedelta(hours=-8)
return run_time
start = "00:00:00"
end = "07:00:00"
work.apply_async(args=[scan_data], eta=get_nextday_run_time(start, end),
queue="隊列名,沒有可刪除參數", routing_key="隊列key,沒有可刪除參數")