Celery任务列表执行


创建计划任务:

from celery import Celery import time my_task=Celery("task",broker="redis://:123456@127.0.0.1:6379",backend="redis://:123456@127.0.0.1:6379") @my_task.task def my_func1(): time.sleep(10) return "任务1" @my_task.task def my_func2(): return "任务2" @my_task.task def my_func3(): return "任务"

调用方法执行指定的任务:

from s1 import my_func1 res=my_func1.delay() print(res)

获取返回值中运行计划的ID

判断计划是否执行完成:

from celery.result import AsyncResult from s1 import my_task async_task=AsyncResult(id="48029f4f-769e-438b-ac97-e89cc0bb1157",app=my_task) # result=async_task.get()

if async_task.successful(): result=async_task.get() print(result+"OK!") else: print("任务还未执行完成!")

 启动celery在命令行执行: Celery worker -A s1 -l INFO -P eventlet -c 6

-A:指定要执行的目录

-l: 指定要使用的打印日志级别

-p:指定使用eventlet插件 让高版本celery支持window平台

-c:指定可执行的计划数量


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM