Django學習筆記(20)celery_tasks 異步任務初識


celery_tasks 異步任務

  當我們需要批量的去執行一些接口,如測試平台的批量運行測試用例時,如果是同步任務的話,會等待用例一個個執行完畢才有返回結果。當點擊運行后,后台直接返回一條信息,由celery服務來運行用例,這就是異步

celery_tasks 工作流

  平台會通過celery提供的方法將我們的操作推到中間件如redis中,celery會啟動一個服務用來監控redis,當redis中有數據時,會取數據取出來后運行完畢后,返回結果給redis,redis有結果后 就將數據傳給平台

 

安裝celery

1 pip install Celery

 

注意

1、在起celery服務之前,需要本地或者遠程啟動redis服務,可在config.py文件中配置redis地地址

2、每次修改了celery文件,都需要從命令行重新啟動celery服務

 

 

celery_tasks 目錄結構

              

Django工程目錄

 

config.py     ---------------------配置中間件,以redis為例

1 # broker_url = "redis://:password@ip:port/4"
2 # result_backend = "redis://:password@ip:port/4"
3 #result_backend = "redis://ip:port/4"#沒有密碼
4 
5 #本地redis
6 broker_url = "redis://127.0.0.1:6379/4" #讀取數據地址
7 result_backend = "redis://127.0.0.1:6379/4" #結果返回數據地址

 

main.py             ------------------啟動tasks任務配置

 1 from celery import Celery  2 app = Celery('sksystem') #創建異步對象
 3 
 4 # 獲取celery的配置信息
 5 app.config_from_object('celery_tasks.config')  6 # 提供tasks的路徑,自動發現需要運行的task任務
 7 app.autodiscover_tasks(['celery_tasks.run'])  8 
 9 # 啟動celery的方法 默認以cpu的核數多進程的方式啟動
10 # celery -A 應用路徑 worker -l 日志級別
11 # mac同學
12 # celery -A celery_tasks.main worker -l info 普通啟動
13 # celery multi start w1 -A celery_tasks.main -l info --logfile=logs/celerylog.log --pidfile=logs/celerypid.pid 后台運行
14 # celery flower -A celery_tasks.main 打開一個web頁面啟動 需要提前安裝下flow 安裝命令:pip install flower
15 
16 # win同學
17 # pip install eventlet
18 # celery -A celery_tasks.main worker -l info -P eventlet
19 # -- * - **** ---
20 # - ** ---------- [config]
21 # - ** ---------- .> app: sksystem:0x103d0deb8 啟動是那個app的任務
22 # - ** ---------- .> transport: redis://10.168.100.21:6379/2 設置的broker的隊列是那個
23 # - ** ---------- .> results: redis://10.168.100.21:6379/3 設置backend的存儲地址
24 # - *** --- * --- .> concurrency: 4 (prefork) 默認啟動的進程數
25 # -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
26 # --- ***** -----
27 # -------------- [queues]
28 # .> celery exchange=celery(direct) key=celery
29 
30 
31 # 在view視圖中只需要導入tasks中寫好的任務方法 通過任務方法調用delay()即可
32 # from celery_tasks.run.tasks import run_case
33 # 調用task任務 參數可以在delay中傳遞,正常調用一樣
34 # run_case.delay() #此方法會返回任務task_id 同run.tasks中的 taskname.request.id

 

tasks.py    --------------- 定義task方法

from celery_tasks.main import app # 在celery中使用logger
from celery.utils.log import get_task_logger logger = get_task_logger('django_server') #用例運行
@app.task(name='run_case') def run_case(case_id,user_id): task_id = run_case.request.id  #通過方法名.request.id 獲取task_id
    print("task_id",task_id) print(case_id) print(user_id) #集合運行
@app.task(name='run_collection') def run_collection(collecti_id,user_id): task_id = run_collection.request.id  #獲取task_id
    print("task_id",task_id) print(collecti_id) print(user_id)

 

 

Views.py ----------------------通過delay()方法啟用celery服務

 1 from celery_tasks.run.tasks import run_case
 2 
 3 #固定celery的調用方式
 4 #run_case.delay(1,2)   #delay()內部傳遞參數
 5 
 6 class CollectionRunView(View): #運行測試集合為例子
 7     def post(self,request):
 8         collect_id = request.POST.get('collect_id')
 9         user_id = request.POST.get('user_id')
10         for item_id in json.loads(collect_id):
11             #當頁面調用異步任務時,會返回一個唯一的任務id
12             task_id = run_collection.delay(item_id,user_id) #啟動異步任務,delay方法支持像函數一樣傳參
13             models.CaseCollection.objects.filter(id=item_id).update(report_batch=task_id,status=3)
14             print("task_id",task_id)
15         return NbResponse()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM