hello, 小伙伴們, 好久不更新了,這一次帶來的是celery在python中的應用以及設置異步任務周期任務和定時任務的步驟,希望能給入坑的你帶來些許幫助.
首先是對celery的介紹,Celery其實是一個專注於實時處理和調度任務的分布式任務隊列,同時提供操作和維護分布式系統所需要的全部數據, 因此可以用它提供的接口快速實現並管理一個分布式的任務隊列,它本身不是任務隊列,它是封裝了操作常見任務隊列的各種操作, 可以使用它快速進行任務隊列的使用與管理.在Python中的組成部分是 1.用戶任務 app 2.管道 broker 用於存儲任務 官方推薦的是 redis rabbitMQ / backend 用於存儲任務執行結果的 3, 員工 worker 大致流程入下:
最左邊的是用戶, 用戶發起1個請求給服務器, 要服務器執行10個任務,將這10個任務分給10個調度器,即開啟10個線程進行任務處理,worker會一直監聽調度器是否有任務, 一旦發現有新的任務, 就會立即執行新任務,一旦執行完就會返回給調度器, 即backend, backend會將請求發送給服務器, 服務器將結果返回給用戶, 表現的結果就是,這10個任務同時完成,同時返回,,這就是Celery的整個工作流程, 其中的角色分別為,任務(app_work), 調度器(broker + backend), 將任務緩存的部分, 即將所有任務暫時存在的地方,相當於生產者, 消費者(worker 可以指定數量, 即在創建worker命令的時候可以指定數量), 在worker拿到任務后,人就控制不了了, 除非把worker殺死, 不然肯定會執行完.
也即 任務來了以后, 調度器(broker)去緩存任務, worker去執行任務, 完成后返回backend,接着返回,
還有就是關於定時任務和周期任務在linux上為什么不用自身所帶着的去做,是因為linux周期定時任務是不可控的, 不好管理, 返回值保存也是個麻煩事, 而celery只要開啟着調度器, 就可以隨時把人物結果獲取到,即使用celery控制起來是非常方便的.
接下來就是實例代碼:

from celery import Celery import time # 創建一個Celery實例, 就是用戶的應用app 第一個參數是任務名稱, 可以隨意起 后面的就是配置的broker和backend diaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379") # 接下來是為應用創建任務 ab @diaoduqi.task def ab(a,b): time.sleep(15) return a+b

from worker import ab # 將任務交給Celery的Worker執行 res = ab.delay(2,4) #返回任務ID print(res.id)

from celery.result import AsyncResult from worker import diaoduqi # 異步獲取任務返回值 async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi) # 判斷異步任務是否執行成功 if async_task.successful(): #獲取異步任務的返回值 result = async_task.get() print(result) else: print("任務還未執行完成")
為了方便,現在直接將三個文件代表的部分命名在文件名稱中.首先是啟動workers.py
啟動方式是依據系統的不同來啟動的, 對於linux下 celery worker -A workers -l INFO 也可以指定開啟的worker數量 即在后面添加的參數是 -c 5 表示指定5個worker 理論上指定的worker是無上限的,
在windows下需要安裝一個eventlet模塊進行運行, 不然不會運行成功 pip install eventlet 可以開啟線程 不指定數量是默認6個worker, 理論上worker的數量可以開啟無限個,但是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 開啟5個worker 執行
該命令后 處於就緒狀態, 需要發布任務, 即brokers.py進行任務發布, 方法是使用delay的方式執行異步任務, 返回了一個任務id, 接着去backends.py中取這個任務id, 去查詢任務是否完成,判定條件即任務.successful 判斷是否執行完, 上面就是celery異步執行任務的用法與解釋
接下來就是celery在項目中的應用
在實際項目中應用celery是有一定規則的, 即目錄結構應該如下.
結構說明 首先是創建一個CeleryTask的包,接着是在里面創建一個celery.py,必須是這個文件 關於重名的問題, 找尋模塊的順序是先從當前目錄中去尋找, 根本找不到,接着是從內置模塊中去找, 根本就找不到寫的這個celery這個文件,

from celery import Celery DDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])

import time from CeleryTask.celery import DDQ @DDQ.task def one1(a,b): # time.sleep(3) return a+b @DDQ.task def one2(): time.sleep(2) return "one2"

import time from CeleryTask.celery import DDQ @DDQ.task def two1(): time.sleep(2) return "two1" @DDQ.task def two2(): time.sleep(3) return "two2"

from CeleryTask.TaskOne import one1 as one # one.delay(10,10) # two.delay(20,20) # 定時任務我們不在使用delay這個方法了,delay是立即交給task 去執行 # 現在我們使用apply_async定時執行 # 首先我們要先給task一個執行任務的時間 import datetime, time # 獲取當前時間 此時間為東八區時間 ctime = time.time() # 將當前的東八區時間改為 UTC時間 注意這里一定是UTC時間,沒有其他說法 utc_time = datetime.datetime.utcfromtimestamp(ctime) # 為當前時間增加 10 秒 add_time = datetime.timedelta(seconds=10) action_time = utc_time + add_time # action_time 就是當前時間未來10秒之后的時間 # 現在我們使用apply_async定時執行 res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) res = one.apply_async(args=(6, 10), eta=action_time) print(res.id) # 這樣原本延遲5秒執行的One函數現在就要在10秒鍾以后執行了
接着是在命令行cd到與CeleryTask同級目錄下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 這樣 就開啟了worker 接着去 發布任務, 在定時任務中不再使用delay這個方法了,
delay是立即交給ttask去執行, 在這里使用 apply_async定時執行 指的是調度的時候去定時執行
需要設置的是UTC時間, 以及定時的時間(多長時間以后執行) 之后使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令開啟worker, 之后運行 getR.py文件發布任務, 可以看到在定義的時間以后執行該任務
周期任務
周期任務 指的是在指定時間去執行任務 需要導入的一個模塊有 crontab
文件結構如下
結構同定時任務差不多,只不過需要變動一下文件內容 GetR文件已經不需要了,可以刪除.

from celery import Celery from celery.schedules import crontab DDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"]) # 我要要對beat任務生產做一個配置,這個配置的意思就是每10秒執行一次Celery_task.task_one任務參數是(10,10) DDQ.conf.beat_schedule = { "each10s_task": { "task": "CeleryTask.TaskOne.one1", "schedule": 10, # 每10秒鍾執行一次 "args": (10, 10) }, "each1m_task": { "task": "CeleryTask.TaskOne.one2", "schedule": crontab(minute=1) # 每1分鍾執行一次 也可以替換成 60 即 "schedule": 60 } }

import time from CeleryTask.celery import DDQ @DDQ.task def one1(a,b): # time.sleep(3) return a+b @DDQ.task def one2(): time.sleep(2) return "one2"

import time from CeleryTask.celery import DDQ @DDQ.task def two1(): time.sleep(2) return "two1" @DDQ.task def two2(): time.sleep(3) return "two2"
以上配置完成以后,這時候就不能直接創建worker了,因為要執行周期任務,需要首先有一個任務的生產方, 即 celery beat -A CeleryTask, 用來產生創建者, 接着是創建worker worker的創建命令還是原來的命令, 即 celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 創建完worker之后, 每10秒就會由beat創建一個任務給 worker去執行.至此, celery創建異步任務, 周期任務,定時任務完畢, 伙伴們自己拿去測試吧.
更多精彩,可以關注樓主的公眾號,
最全面的django面試題總結: 回復django面試題即可獲取, pycharm供給激活壓縮包, 回復pycharm破解包即可獲取,破解步驟在我的這一篇博客(點我直達)已經破解百次, 屢試不爽.
以及其他樓主精心打造的原創文章,歡迎各位來訪.