一、celery簡介:
Celery 是一個強大的 分布式任務隊列 的 異步處理框架,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
可以看到,Celery 主要包含以下幾個模塊:
-
任務模塊 Task
包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往任務隊列,而定時任務由 Celery Beat 進程周期性地將任務發往任務隊列。
-
消息中間件 Broker
Broker,即為任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。
-
任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。
-
任務結果存儲 Backend
Backend 用於存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, redis 和 MongoDB 等。
所以總結一下celery:它是一個處理大量消息的分布式系統,能異步任務、定時任務,使用場景一般用於耗時操作的多任務或者定時性的任務
二、celery安裝與使用
pycharm安裝:
pip3 install celery
初步使用:(創建一個Python項目)
① 實例化一個celery對象,使用該對象.task裝飾需要管理的任務函數:
# celery_task.py from celery import Celery """ # 如果redis沒有設置密碼 broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' """ broker = 'redis://:12345@127.0.0.1:6379/1' backend = 'redis://:12345@127.0.0.1:6379/2' # c1是實例化產生的celery的名字,因為會存在多個celery app = Celery('c1', broker=broker, backend=backend) # 需要使用一個裝飾器,來管理該任務(函數) @app.task def add(x, y): import time time.sleep(1) return x + y
② 將裝飾的任務函數條件到消息隊列中,此時提交的任務函數並沒有執行,只是提交到worker,它會返回一個標識任務的字符串
# submit.task.py # 用於提交任務 from celery_task import add # 提交任務到消息隊列中,這里只是將任務提交,並沒有執行 res = add. delay(3, 8) print(res) # 結果是標識任務的字符串(id號) # 7811a028-428c-4dd5-9135-788e26e694a7
③ 使用命令啟動worker去剛才提交的執行任務
linux: celery worker -A celery_task -l info windows下:celery worker -A celery_task -l info -P eventlet
④ 查看結果,根據提交任務返回的字符串去查詢
# check_res.py from celery.result import AsyncResult from celery_task import app async = AsyncResult(id='bd600820-9366-4220-a679-3e435ae91e71', app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中') elif async.status == 'RETRY': print('任務異常后重試') elif async.status == 'STARTED': print('任務正在執行')
celery簡單使用流程:
-celery的使用 -pip3 install celery -寫一個py文件:celery_task -1 指定broker(消息中間件),指定backend(結果存儲) -2 實例化產生一個Celery對象 app=Celery('名字',broker,backend) -3 加裝飾器綁定任務,在函數(add)上加裝飾器app.task -4 其他程序提交任務,先導入add,add.delay(參,參數),會將該函數提交到消息中間件,但是並不會執行,有個返回值,直接print會打印出任務的id,以后用id去查詢任務是否執行完成 -5 啟動worker去執行任務: linux: celery worker -A celery_task_s1 -l info windows下:celery worker -A celery_task_s1 -l info -P eventlet -6 查看結果:根據id去查詢 async = AsyncResult(id="bd600820-9366-4220-a679-3e435ae91e71", app=app) if async.successful(): #取出它return的值 result = async.get() print(result)
celery的多任務
# celery的多任務結構 -項目結構: pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery連接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 所有任務函數 │ └── tasks2.py # 所有任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務 -啟動worker,celery_task是包的名字 celery worker -A celery_task -l info -P eventlet
按照多任務文件結構創建文件:
注意celery.py這個文件的文件名是固定的,不能改,task_1和task_2可以自己定義,他倆代表自定義的任務分類,還可以再創建task_3。。。等其它名字的任務文件,send_task.py是提交任務到worker,check_result.py是查看結果的
# celery.py from celery import Celery broker = 'redis://:12345@127.0.0.1:6379/1' backend = 'redis://:12345@127.0.0.1:6379/2' # c1是實例化產生的celery的名字,因為會存在多個celery app = Celery('c1', broker=broker, backend=backend, # 包含一些2個任務文件,去相應的py文件找任務,對多個任務進行分類 include=[ 'celery_task.task_1', 'celery_task.task_2', ]) # celery提供一些配置,具體可查看官方文檔 # app.conf.timezone = 'Asia/Shanghai'
在send_task.py種右鍵運行,提交任務到worker(這里打印了提交的2個任務的id)
# task_1.py from celery_task.celery import app @app.task def add1(x, y): import time time.sleep(0.5) return x + y # task_2.py from celery_task.celery import app @app.task def add2(x, y): import time time.sleep(1) return x * y
# send_task.py from celery_task.task_1 import add1 from celery_task.task_2 import add2 res1 = add1.delay(3, 8) print(res1) # 16e847f3-fc14-4391-89e2-e2b3546872cf res2 = add2.delay(4, 9) print(res2) # 858c0ae5-8516-4473-8be5-7501fb856ff4
啟動worker,celery_task是包的名字
celery worker -A celery_task -l info -P eventlet
然后將打印的2個id在check_result.py中進行查詢結果
# check_reslut.py from celery.result import AsyncResult from celery_task.celery import app for i in ['16e847f3-fc14-4391-89e2-e2b3546872cf', '858c0ae5-8516-4473-8be5-7501fb856ff4']: async = AsyncResult(id=i, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中') elif async.status == 'RETRY': print('任務異常后重試') elif async.status == 'STARTED': print('任務正在執行')
celery的定時任務
方式一:執行時間在年月日時分秒
在提交任務的地方修改:
# send_task.py from celery_task.task_1 import add1 from celery_task.task_2 import add2 # 執行定時任務,3s以后執行add1、add2任務 from datetime import datetime # 設置任務執行時間2019年7月12日21點45分12秒 v1 = datetime(2019, 7, 12, 21, 48, 12) print(v1) # 2019-07-12 21:45:12 # 將v1時間轉成utc時間 v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) # 2019-07-12 13:45:12 # 取出要執行任務的時間對象,調用apply_async方法,args是任務函數傳的參數,eta是執行的時間 result1 = add1.apply_async(args=[3, 8], eta=v2) result2 = add2.apply_async(args=[4, 9], eta=v2) print(result1.id) print(result2.id)
方式二:通過延遲執行的時間算出執行的具體utc時間,與方式一基本相同
在提交任務的地方修改:
# send_task.py # 方式二:實際上和方法一類似,多了一個延遲時間,也就是用現在時間和推遲執行的時間計算出任務執行的最終utc時間 # 然后也是調用apply_async方法。 from datetime import datetime ctime = datetime.now() # 默認使用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta # 使用timedelta模塊,拿到10秒后的時間對象,這里參數可以傳秒、毫秒、微秒、分、小時、周、天 time_delay = timedelta(seconds=10) # 得到任務運行時間: task_time = utc_ctime + time_delay result1 = add1.apply_async(args=[3, 8], eta=task_time) result2 = add2.apply_async(args=[4, 9], eta=task_time) print(result1.id) print(result2.id)
celery的計划任務
計划任務需要在celery.py中添加代碼,然后需要beat一下,才能將計划開啟
# celery.py中 from celery import Celery broker = 'redis://:12345@127.0.0.1:6379/1' backend = 'redis://:12345@127.0.0.1:6379/2' # c1是實例化產生的celery的名字,因為會存在多個celery app = Celery('c1', broker=broker, backend=backend, # 包含一些2個任務文件,去相應的py文件找任務,對多個任務進行分類 include=[ 'celery_task.task_1', 'celery_task.task_2', 'celery_task.task_3', ]) # celery提供一些配置,具體可查看官方文檔 # app.conf.timezone = "Asia/Shanghai" # app.conf.enable_utc = True # 計划任務 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'submit_every_2_seconds': { # 計划的任務執行函數 'task': 'celery_task.task_1.add1', # 每個2秒執行一次 'schedule': timedelta(seconds=2), # 傳遞的任務函數參數 'args': (3, 9) }, 'submit_every_3_seconds': { # 計划的任務執行函數 'task': 'celery_task.task_2.add2', # 每個3秒執行一次 'schedule': timedelta(seconds=3), # 傳遞的任務函數參數 'args': (4, 7) }, 'submit_in_fix_datetime': { 'task': 'celery_task.task_3.add3', # 比如每年的7月13日10點53分執行 # 注意:默認使用utc時間,當前的時間中的小時必須要-8個小時才會到點提交 'schedule': crontab(minute=53, hour=2, day_of_month=13, month_of_year=7), ''' # 如果不想-8,可以先設置時區,再按正常時間設置 app.conf.timezone = "Asia/Shanghai" app.conf.enable_utc = True ''' 'args': ('Hello World',) } } # 上面寫完后,需要起一個進程,啟動計划任務 # celery beat -A celery_task -l info # 啟動worker: # celery worker -A celery_task -l info -P eventlet
Django中使用celery
django-celery:由於djang-celery模塊對版本的要求過於嚴格,而且容易出現很多bug,所以不建議使用
直接使用celery多任務結構的,將celery多任務結構的代碼文件夾celery_task拷貝到Django項目中,然后在視圖函數中進行任務提交、然后進行結構查看。(啟動項目時候記得將worker啟動起來,注意啟動路徑要跟你拷貝的celery_task文件同級)
注意:當我們在Django項目中使用celery,在celery的任務函數中不能直接調用django的環境(比如orm方法查詢數據庫),需要添加代碼調用Django環境
在Python腳本中調用Django環境
import os # 加載Django環境,bbs是所在的Django項目名稱 os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'bbs.settings') # 引入Django模塊 import django # 初始化Django環境 django.setup() # 從app當中導入models from app01 import models # 調用操作,拿到數據庫中的所有Book數據對象 books = models.Books.objects.all()