Django学习笔记(20)celery_tasks 异步任务初识


celery_tasks 异步任务

  当我们需要批量的去执行一些接口,如测试平台的批量运行测试用例时,如果是同步任务的话,会等待用例一个个执行完毕才有返回结果。当点击运行后,后台直接返回一条信息,由celery服务来运行用例,这就是异步

celery_tasks 工作流

  平台会通过celery提供的方法将我们的操作推到中间件如redis中,celery会启动一个服务用来监控redis,当redis中有数据时,会取数据取出来后运行完毕后,返回结果给redis,redis有结果后 就将数据传给平台

 

安装celery

1 pip install Celery

 

注意

1、在起celery服务之前,需要本地或者远程启动redis服务,可在config.py文件中配置redis地地址

2、每次修改了celery文件,都需要从命令行重新启动celery服务

 

 

celery_tasks 目录结构

              

Django工程目录

 

config.py     ---------------------配置中间件,以redis为例

1 # broker_url = "redis://:password@ip:port/4"
2 # result_backend = "redis://:password@ip:port/4"
3 #result_backend = "redis://ip:port/4"#没有密码
4 
5 #本地redis
6 broker_url = "redis://127.0.0.1:6379/4" #读取数据地址
7 result_backend = "redis://127.0.0.1:6379/4" #结果返回数据地址

 

main.py             ------------------启动tasks任务配置

 1 from celery import Celery  2 app = Celery('sksystem') #创建异步对象
 3 
 4 # 获取celery的配置信息
 5 app.config_from_object('celery_tasks.config')  6 # 提供tasks的路径,自动发现需要运行的task任务
 7 app.autodiscover_tasks(['celery_tasks.run'])  8 
 9 # 启动celery的方法 默认以cpu的核数多进程的方式启动
10 # celery -A 应用路径 worker -l 日志级别
11 # mac同学
12 # celery -A celery_tasks.main worker -l info 普通启动
13 # celery multi start w1 -A celery_tasks.main -l info --logfile=logs/celerylog.log --pidfile=logs/celerypid.pid 后台运行
14 # celery flower -A celery_tasks.main 打开一个web页面启动 需要提前安装下flow 安装命令:pip install flower
15 
16 # win同学
17 # pip install eventlet
18 # celery -A celery_tasks.main worker -l info -P eventlet
19 # -- * - **** ---
20 # - ** ---------- [config]
21 # - ** ---------- .> app: sksystem:0x103d0deb8 启动是那个app的任务
22 # - ** ---------- .> transport: redis://10.168.100.21:6379/2 设置的broker的队列是那个
23 # - ** ---------- .> results: redis://10.168.100.21:6379/3 设置backend的存储地址
24 # - *** --- * --- .> concurrency: 4 (prefork) 默认启动的进程数
25 # -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
26 # --- ***** -----
27 # -------------- [queues]
28 # .> celery exchange=celery(direct) key=celery
29 
30 
31 # 在view视图中只需要导入tasks中写好的任务方法 通过任务方法调用delay()即可
32 # from celery_tasks.run.tasks import run_case
33 # 调用task任务 参数可以在delay中传递,正常调用一样
34 # run_case.delay() #此方法会返回任务task_id 同run.tasks中的 taskname.request.id

 

tasks.py    --------------- 定义task方法

from celery_tasks.main import app # 在celery中使用logger
from celery.utils.log import get_task_logger logger = get_task_logger('django_server') #用例运行
@app.task(name='run_case') def run_case(case_id,user_id): task_id = run_case.request.id  #通过方法名.request.id 获取task_id
    print("task_id",task_id) print(case_id) print(user_id) #集合运行
@app.task(name='run_collection') def run_collection(collecti_id,user_id): task_id = run_collection.request.id  #获取task_id
    print("task_id",task_id) print(collecti_id) print(user_id)

 

 

Views.py ----------------------通过delay()方法启用celery服务

 1 from celery_tasks.run.tasks import run_case
 2 
 3 #固定celery的调用方式
 4 #run_case.delay(1,2)   #delay()内部传递参数
 5 
 6 class CollectionRunView(View): #运行测试集合为例子
 7     def post(self,request):
 8         collect_id = request.POST.get('collect_id')
 9         user_id = request.POST.get('user_id')
10         for item_id in json.loads(collect_id):
11             #当页面调用异步任务时,会返回一个唯一的任务id
12             task_id = run_collection.delay(item_id,user_id) #启动异步任务,delay方法支持像函数一样传参
13             models.CaseCollection.objects.filter(id=item_id).update(report_batch=task_id,status=3)
14             print("task_id",task_id)
15         return NbResponse()

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM