celery異步,延時任務, 周期任務


  celery中文譯為芹菜,是一個分布式任務隊列. 是異步的,所以能處理大量消息

  最新的celery不支持windows下使用了,所以在使用pycharm安裝celery模塊之后,需要再安裝eventlet模塊才能測試運行.

一.異步任務

啟動客戶端:

s1,s2要在項目目錄下,如果在文件夾中執行,terminal輸入命令的時候要-A 項目文件夾的名字

c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1", include="項目名.文件夾")

 

Terminal中輸入

celery worker -A s1 -l info -P eventlet

 

  給定兩個文件

  s1.py

from celery import Celery
import time
c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1")

@c.task
def myfun1(a,b):
    return f"myfun1{a}{b}"

@c.task
def myfun2():
    return "myfun2"

@c.task
def myfun3():
    return "myfun3"

celery worker -A s1 -l info -P eventlet

  s2.py

from s1 import myfun1,myfun2,myfun3,c
from celery.result import AsyncResult
#多個生產者
# for i in range(10):
#     s=myfun1.delay()
#     print(s)

s=myfun1.delay(10,20)
print(s.id)
r=AsyncResult(id=s.id,app=c)

#獲取狀態
# print(r.status)
# print(r.successful())

#獲取值
# print(r.get())

#只獲取報錯信息
print(r.get(propagate=False))

#獲取具體出錯的位置
# print(r.traceback)

 

二.延時任務/定時任務

  apply_async

t=add.apply_async((1,2),countdown=5) #表示延遲5秒鍾執行任務
print(t)
print(t.get())

  支持的參數

countdown : 等待一段時間再執行.
add.apply_async((2,3), countdown=5)
eta : 定義任務的開始時間.這里的時間是UTC時間,這里有坑 add.apply_async((
2,3), eta=now+tiedelta(second=10))
expires : 設置超時時間. add.apply_async((
2,3), expires=60)
retry : 定時如果任務失敗后, 是否重試. add.apply_async((
2,3), retry=False)
retry_policy : 重試策略.   max_retries : 最大重試次數, 默認為
3 次.   interval_start : 重試等待的時間間隔秒數, 默認為 0 , 表示直接重試不等待.   interval_step : 每次重試讓重試間隔增加的秒數, 可以是數字或浮點數, 默認為 0.2   interval_max : 重試間隔最大的秒數, 即 通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數字或者浮點數, 默認為 0.2 .

  s1.py

from celery import Celery
import time
c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1")

@c.task
def myfun1(a,b):
    return f"myfun1{a}{b}"

@c.task
def myfun2():
    return "myfun2"

@c.task
def myfun3():
    return "myfun3"

  s2.py

from s1 import myfun1,myfun2,myfun3,c
from celery.result import AsyncResult
from datetime import timedelta

#指定多長時間以后執行
# s=myfun1.apply_async((10,20),countdown=5)

#第二種方式,使用utc時間
s=myfun1.apply_async((10,20),eta="utc時間")
print(s.id)
# 延時
# 重試

三.周期任務

  啟動: 在Terminal中

celery beat -A s2 -l info

 

  s1.py

from celery import Celery
import time
c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1")

@c.task
def myfun1(a,b):
    return f"myfun1{a}{b}"

@c.task
def myfun2():
    return "myfun2"

@c.task
def myfun3():
    return "myfun3"

  s2.py

from s1 import c
from celery.beat import crontab
c.conf.beat_schedule = {
    "name": {
        "task": "s1.myfun1",
        "schedule": 3,
        "args": (10, 20)
    },
    "crontab": {
        "task": "s1.myfun1",
        "schedule": crontab(minute=44),
        "args": (10, 20)
    }
}

 配置詳解:

 from celery.schedules import crontab

 CELERYBEAT_SCHEDULE = {
     # Executes every Monday morning at 7:30 A.M
     'add-every-monday-morning': {
         'task': 'tasks.add',
         'schedule': crontab(hour=7, minute=30, day_of_week=1),
         'args': (16, 16),
    },
 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM