Celery Beat定時任務


示例代碼

創建schedule_tasks.py文件

from celery import Celery
from celery.schedules import crontab

app = Celery("SchedulerTasks", broker='pyamqp://admin:admin@172.16.37.200:5672//')


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )


@app.task
def test(arg):
    print(arg)

 

啟動Celery Beat-任務生產者

# celery -A schedule_tasks beat

 

啟動Celery Worker-任務消費者

# celery -A schedule_tasks worker 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM