專注於實時處理的異步任務隊列
同時也支持任務調度
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等
Celery version 4.0 runs on Python ❨2.7, 3.4, 3.5❩ PyPy ❨5.4, 5.5❩ This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required. If you’re running an older version of Python, you need to be running an older version of Celery: Python 2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
定時任務:定時執行某件事情,比如每天數據統計
三、Celery的安裝配置
pip install celery
消息中間件:RabbitMQ/Redis
app=Celery('任務名',backend='xxx',broker='xxx')
四、Celery執行異步任務
基本使用
創建項目celerytest
創建py文件:celery_app_task.py
import celery import time # broker='redis://127.0.0.1:6379/2' 不加密碼 backend='redis://:123456@127.0.0.1:6379/1' broker='redis://:123456@127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def add(x,y): return x+y
創建py文件:add_task.py,添加任務
from celery_app_task import add result = add.delay(4,5) print(result.id)
注:windows下:celery worker -A celery_app_task -l info -P eventlet
from celery_app_task import cel if __name__ == '__main__': cel.worker_main() # cel.worker_main(argv=['--loglevel=info')
創建py文件:result.py,查看任務執行結果
from celery.result import AsyncResult from celery_app_task import cel async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info
執行 result.py,檢查任務狀態並獲取結果
pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery連接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 所有任務函數 │ └── tasks2.py # 所有任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務
celery.py
from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類 include=['celery_task.tasks1', 'celery_task.tasks2' ]) # 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
tasks1.py
import time from celery_task.celery import cel @cel.task def test_celery(res): time.sleep(5) return "test_celery任務結果:%s"%res
tasks2.py
import time from celery_task.celery import cel @cel.task def test_celery2(res): time.sleep(5) return "test_celery2任務結果:%s"%res
check_result.py
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除,執行完成,結果不會自動刪除 # async.revoke(terminate=True) # 無論現在是什么時候,都要終止 # async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那么就可以終止。 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
send_task.py
from celery_task.tasks1 import test_celery from celery_task.tasks2 import test_celery2 # 立即告知celery去執行test_celery任務,並傳入一個參數 result = test_celery.delay('第一個的執行') print(result.id) result = test_celery2.delay('第二個的執行') print(result.id)
添加任務(執行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)
設定時間讓celery執行一個任務
add_task.py
from celery_app_task import add from datetime import datetime # 方式一 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async並設定時間 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_task.tasks1', 'celery_task.tasks2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.tasks1.test_celery', # 每隔2秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=2), # 傳遞參數 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # 每年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
啟動work執行:celery worker -A celery_task -l info -P eventlet
在項目目錄下創建celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些情況可以防止死鎖 CELERYD_FORCE_EXECV=True # 設置並發worker數量 CELERYD_CONCURRENCY=4 #允許重試 CELERY_ACKS_LATE=True # 每個worker最多執行100個任務被銷毀,可以防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*30
在app01目錄下創建tasks.py
from celery import task @task def add(a,b):
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')
settings.py
with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a+b)
視圖函數views.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... from djagocele import celeryconfig BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'