celery有多坑,能坑的你親媽都不認識,信我!!!
聲明:代碼是從項目中截取的, 為進行測試
使用Celery任務隊列,Celery 只是一個任務隊列,需要一個broker媒介,將耗時的任務傳遞給Celery任務隊列執行,執行完畢將結果通過broker媒介返回。官方推薦使用RabbitMQ作為消息傳遞,redis也可以
一、Celery 介紹:概念網上一搜一堆,自己百度去
二、安裝、配置
python版本3.5.4 別用3.6, 也別用3.7,很多不支持,巨坑無比
redis 5.0.8.msi ----------- rabbitmq 這個應該沒什么差異
pip install redis
pip3 install tornado==5.1.1 或者4.1 pip3 install celery==3.1 pip3 install pika==0.9.14 pip3 install tornado-celery
三、window下面測試最簡單celery有返回值的實例--時間:2020/7/17
用redis和rabbitmq效率差的不是一點半點
tasks.py
from celery import Celery
#配置好celery的backend和broker
# app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
# app = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379')
app = Celery('tasks', backend='amqp://localhost:5672', broker='amqp://localhost:5672')
# app.conf.CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
# app.conf.CELERY_RESULT_BACKEND = "redis"
# app.conf.CELERY_ACCEPT_CONTENT = ['application/json']
# app.conf.CELERY_TASK_SERIALIZER = 'json'
# app.conf.CELERY_RESULT_SERIALIZER = 'json'
# app.conf.BROKER_HEARTBEAT = 30
# app.conf.CELERY_IGNORE_RESULT = False # this is less important
@app.task #普通函數裝飾為 celery task
def add(x, y):
return x + y

#trigger.py
import time
from tasks import add
a = time.time()
result = add.delay(4, 4) #不要直接 add(4, 4),這里需要用 celery 提供的接口 delay 進行調用
# result = add.apply_async((4, 4)) #不要直接 add(4, 4),這里需要用 celery 提供的接口 apply_async 進行調用
print(result.ready())
# while not result.ready():
# time.sleep(1)
print('task done: {0}'.format(result.get()))
print(time.time() - a)
celery啟動命令當前目錄下
celery -A tasks worker --loglevel=info
升級版window下面測試最簡單celery有返回值的實例(簡單的調整了一下)
config.py
# file_name=init_celery.py # coding: utf-8 from celery import Celery BROKER_URL = 'amqp://localhost:5672' BACKEND_URL = 'amqp://localhost:5672' # Add tasks here CELERY_IMPORTS = ( 'tasks', ) celery = Celery('celery', broker=BROKER_URL, backend=BACKEND_URL, include=CELERY_IMPORTS, ) celery.conf.update( CELERY_ACKS_LATE=True, # 允許重試 CELERY_ACCEPT_CONTENT=['pickle', 'json'], CELERYD_FORCE_EXECV=True, # 有些情況可以防止死鎖 CELERYD_CONCURRENCY=4, # 設置並發worker數量 CELERYD_MAX_TASKS_PER_CHILD=500, # 每個worker最多執行500個任務被銷毀,可以防止內存泄漏 BROKER_HEARTBEAT=0, # 心跳 CELERYD_TASK_TIME_LIMIT=12 * 30, # 超時時間 )
main.py
from tasks import add def notify(a, b): result = add.delay(a, b) print("ready: ", result.ready()) print('return data: {0}'.format(result.get())) return result if __name__ == '__main__': import time a = time.time() haha = notify(6, 7) print("status: ", haha.status) print("id: ", haha.id) print("used time: %s s" % (time.time() - a))
tasks.py
from config import celery # 普通函數裝飾為 celery task @celery.task def add(x, y): return x + y
run_celery.sh --當前目錄下執行
celery -A config worker --loglevel=info
Python3.5.4 Tornado + Celery + Redis 得到Celery返回值(時間:2020/7/18記錄)看到這,后面的就不用看了,這個例子最新
這個問題困擾我很久,今天終於解決了,為什么呢?因為呀過幾天有個面試,可能涉及這個問題,所以熬了幾天終於把這個問題給解決了
其實這個問題已經拖了幾年了,原因是以前我用的是tornado+celery+rabbitmq,不需要返回值,直接去處理任務就好了,可是當時思考我如果需要拿返回值怎么辦
后來就在這個地方卡住了,怎么也沒找到正確的解決方法,因為一直認為是自己的寫法或者沒搞明白原理,而沒有間接的去尋找解決方案
首先先把Tornado + Celery + RabbitMQ得不到返回值的解決方案說一下
用RabbitMQ的好處是,RabbitMQ的效率比Redis的效率要高,但為什么redis能拿到返回值呢
因為pika,它連接了redis,而celery把值存儲到了redis,這個時候生成了有個key存儲到redis,然后tornado這面在從redis取值,知道了這個原理,那后面也就好說了
tornado + celery + redis的redis這里既充當了消息隊列,還用到了redis的緩存
解決方案:
tornado也生成一個key,給celery,然后celery的task的函數(例如add)把返回值存儲到緩存里面,tornado在去取就行了呀
rabbitmq充當消息隊列,redis充當緩存角色
版本
python版本3.5.4 別用3.6, 也別用3.7,很多不支持,巨坑無比
redis 5.0.8.msi ----------- rabbitmq 這個應該沒什么差異
pip install redis
pip3 install tornado==5.1.1 或者4.1 pip3 install celery==3.1 pip3 install pika==0.9.14 pip3 install tornado-celery
app.py --- 這里tcelery感覺並沒有起到什么作用,如果誰有時間可以進行一下測試,看看
import tornado.ioloop import tornado.web from tornado import gen from tornado.gen import coroutine from tornado.web import asynchronous import tcelery from tasks import test tcelery.setup_nonblocking_producer() # 設置為非阻塞生產者,否則無法獲取回調信息 class MainHandler(tornado.web.RequestHandler): @coroutine @asynchronous def get(self): import time a = time.time() # result = yield torncelery.async_me(test, "hello world") #不能這么用 # result = yield gen.Task(test.apply_async, args=["hello11"]) # 不能這么用 result = test.delay("hello world") print("result: ", result.get()) print("ready: ", result.ready()) print("status: ", result.status) print("id: ", result.id) b = time.time() print(b-a) self.write("%s" % result.get()) self.finish() application = tornado.web.Application([ (r"/", MainHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
tasks.py
from celery import Celery import time celery = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379') # celery = Celery('tasks', backend='amqp://localhost:5672', broker='amqp://localhost:5672') celery.conf.update( CELERY_ACKS_LATE=True, # 允許重試 CELERY_ACCEPT_CONTENT=['pickle', 'json'], CELERYD_FORCE_EXECV=True, # 有些情況可以防止死鎖 CELERYD_CONCURRENCY=4, # 設置並發worker數量 CELERYD_MAX_TASKS_PER_CHILD=500, # 每個worker最多執行500個任務被銷毀,可以防止內存泄漏 BROKER_HEARTBEAT=60, # 心跳 CELERYD_TASK_TIME_LIMIT=12 * 30, # 超時時間 ) @celery.task def test(strs): print("str:", strs) return strs
這里有一個問題,就是響應時間,第一次是3s,以后每次都是0.5s,如何提升效率
配置詳情
單個參數配置: app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
多個參數配置: app.conf.update( CELERY_BROKER_URL = ‘amqp://guest@localhost//‘, CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘ )
從配置文件中獲取:(將配置參數寫在文件app.py中)
BROKER_URL=‘amqp://guest@localhost//‘ CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘ app.config_from_object(‘celeryconfig‘)
三、案例 --這個也有問題了
啟動一個Celery 任務隊列,也就是消費者:
from celery import Celery celery = Celery(‘tasks‘, broker=‘amqp://guest:guest@119.29.151.45:5672‘, backend=‘amqp‘) 使用RabbitMQ作為載體, 回調也是使用rabbit作為載體 @celery.task(name=‘doing‘) #異步任務,需要命一個獨一無二的名字 def doing(s, b): print(‘開始任務‘) logging.warning(‘開始任務--{}‘.format(s)) time.sleep(s) return s+b
啟動任務生產者 -- 當初寫這個代碼的時候沒想好可能,現在的重新弄一下了
#!/usr/bin/env python # -*- coding:utf-8 -*- import tcelery from tornado.web import RequestHandler import tornado tcelery.setup_nonblocking_producer() # 設置為非阻塞生產者,否則無法獲取回調信息 class MyMainHandler(RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): print('begin') result = yield tornado.gen.Task(sleep.apply_async, args=[10]) # 使用yield 獲取異步返回值,會一直等待但是不阻塞其他請求 print('ok - -{}'.format(result.result)) # 返回值結果 # sleep.apply_async((10, ), callback=self.on_success) # print(‘ok -- {}‘.format(result.get(timeout=100)))#使用回調的方式獲取返回值,發送任務之后,請求結束,所以不能放在處理tornado的請求任務當中,因為請求已經結束了,與客戶端已經斷開連接,無法再在獲取返回值的回調中繼續向客戶端返回數據 # result = sleep.delay(10) #delay方法只是對apply_async方法的封裝而已 # data = result.get(timeout=100) #使用get方法獲取返回值,會導致阻塞,相當於同步執行 def on_success(self, response): # 回調函數 print('Ok - - {}'.format(response))
=======================

#!/usr/bin/env python # -*- coding:utf-8 -*- from tornado.web import Application from tornado.ioloop import IOLoop import tcelery from com.analysis.handlers.data_analysis_handlers import * from com.analysis.handlers.data_summary_handlers import * from com.analysis.handlers.data_cid_sumjson_handler import Cid_Sumjson_Handler from com.analysis.handlers.generator_handlers import GeneratorCsv, GeneratorSpss Handlers = [ (r"/single_factor_variance_analysis/(.*)", SingleFactorVarianceAnalysis), # 單因素方差檢驗 ] if __name__ == "__main__": tcelery.setup_nonblocking_producer() application = Application(Handlers) application.listen(port=8888, address="0.0.0.0") IOLoop.instance().start()

#!/usr/bin/env python # -*- coding:utf-8 -*- import tornado.gen import tornado.web from com.analysis.core.base import BaseAnalysisRequest from com.analysis.tasks.data_analysis import * class SingleFactorVarianceAnalysis(BaseAnalysisRequest): @tornado.gen.coroutine def get(self, *args, **kwargs): response = yield self.celery_task(single_factor_variance_analysis.apply_async, params=args) print(response.result) self.write(response.result[2])

#!/usr/bin/env python # -*- coding:utf-8 -*- from collections import defaultdict import pandas as pd import numpy as np import pygal import tornado.gen from pygal.style import LightStyle from tornado.web import RequestHandler import json from com.analysis.db.db_engine import DBEngine from com.analysis.utils.log import LogCF from com.analysis.handlers.data_cid_sumjson_handler import cid_sumjson class BaseRequest(RequestHandler): def __init__(self, application, request, **kwargs): super(BaseRequest, self).__init__(application, request, **kwargs) class BaseAnalysisRequest(BaseRequest): def __init__(self, application, request, **kwargs): super(BaseAnalysisRequest, self).__init__(application, request, **kwargs) @tornado.gen.coroutine def celery_task(self, func, params, queue="default_analysis"): args_list = list(params) args_list.insert(0, "") response = yield tornado.gen.Task(func, args=args_list, queue=queue) raise tornado.gen.Return(response)

#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from com.analysis.core.chi_square_test import CST from com.analysis.generator.generator import GeneratorCsv, GeneratorSpss celery = Celery( 'com.analysis.tasks.data_analysis', broker='amqp://192.168.1.1:5672', include='com.analysis.tasks.data_analysis' ) celery.conf.CELERY_RESULT_BACKEND = "amqp://192.168.1.1:5672" celery.conf.CELERY_ACCEPT_CONTENT = ['application/json'] celery.conf.CELERY_TASK_SERIALIZER = 'json' celery.conf.CELERY_RESULT_SERIALIZER = 'json' celery.conf.BROKER_HEARTBEAT = 30 celery.conf.CELERY_IGNORE_RESULT = False # this is less important logger = Logger().getLogger() @celery.task() def single_factor_variance_analysis(*args): return SFV().do_(*args)