celery中文譯為芹菜,是一個分布式任務隊列. 是異步的,所以能處理大量消息
最新的celery不支持windows下使用了,所以在使用pycharm安裝celery模塊之后,需要再安裝eventlet模塊才能測試運行.
一.異步任務
啟動客戶端:
s1,s2要在項目目錄下,如果在文件夾中執行,terminal輸入命令的時候要-A 項目文件夾的名字
c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1", include="項目名.文件夾")
Terminal中輸入
celery worker -A s1 -l info -P eventlet
給定兩個文件
s1.py
from celery import Celery import time c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1") @c.task def myfun1(a,b): return f"myfun1{a}{b}" @c.task def myfun2(): return "myfun2" @c.task def myfun3(): return "myfun3" celery worker -A s1 -l info -P eventlet
s2.py
from s1 import myfun1,myfun2,myfun3,c from celery.result import AsyncResult #多個生產者 # for i in range(10): # s=myfun1.delay() # print(s) s=myfun1.delay(10,20) print(s.id) r=AsyncResult(id=s.id,app=c) #獲取狀態 # print(r.status) # print(r.successful()) #獲取值 # print(r.get()) #只獲取報錯信息 print(r.get(propagate=False)) #獲取具體出錯的位置 # print(r.traceback)
二.延時任務/定時任務
apply_async
t=add.apply_async((1,2),countdown=5) #表示延遲5秒鍾執行任務 print(t) print(t.get())
支持的參數
countdown : 等待一段時間再執行. add.apply_async((2,3), countdown=5)
eta : 定義任務的開始時間.這里的時間是UTC時間,這里有坑 add.apply_async((2,3), eta=now+tiedelta(second=10))
expires : 設置超時時間. add.apply_async((2,3), expires=60)
retry : 定時如果任務失敗后, 是否重試. add.apply_async((2,3), retry=False)
retry_policy : 重試策略. max_retries : 最大重試次數, 默認為 3 次. interval_start : 重試等待的時間間隔秒數, 默認為 0 , 表示直接重試不等待. interval_step : 每次重試讓重試間隔增加的秒數, 可以是數字或浮點數, 默認為 0.2 interval_max : 重試間隔最大的秒數, 即 通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數字或者浮點數, 默認為 0.2 .
s1.py
from celery import Celery import time c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1") @c.task def myfun1(a,b): return f"myfun1{a}{b}" @c.task def myfun2(): return "myfun2" @c.task def myfun3(): return "myfun3"
s2.py
from s1 import myfun1,myfun2,myfun3,c from celery.result import AsyncResult from datetime import timedelta #指定多長時間以后執行 # s=myfun1.apply_async((10,20),countdown=5) #第二種方式,使用utc時間 s=myfun1.apply_async((10,20),eta="utc時間") print(s.id) # 延時 # 重試
三.周期任務
啟動: 在Terminal中
celery beat -A s2 -l info
s1.py
from celery import Celery import time c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1") @c.task def myfun1(a,b): return f"myfun1{a}{b}" @c.task def myfun2(): return "myfun2" @c.task def myfun3(): return "myfun3"
s2.py
from s1 import c from celery.beat import crontab c.conf.beat_schedule = { "name": { "task": "s1.myfun1", "schedule": 3, "args": (10, 20) }, "crontab": { "task": "s1.myfun1", "schedule": crontab(minute=44), "args": (10, 20) } }
配置詳解:
from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # Executes every Monday morning at 7:30 A.M 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }