python3 Tornado + Celery + RabbitMQ


celery有多坑,能坑的你親媽都不認識,信我!!!

 

聲明:代碼是從項目中截取的, 為進行測試

 

使用Celery任務隊列,Celery 只是一個任務隊列,需要一個broker媒介,將耗時的任務傳遞給Celery任務隊列執行,執行完畢將結果通過broker媒介返回。官方推薦使用RabbitMQ作為消息傳遞,redis也可以

 

一、Celery 介紹:概念網上一搜一堆,自己百度去

注意:
1、當使用RabbitMQ時,需要按照pika第三方庫,pika0.10.0存在bug,無法獲得回調信息,需要按照0.9.14版本即可
2、tornado-celery 庫比較舊,無法適應Celery的最新版,會導致報無法導入task Producter包錯誤,只需要將celery版本按照在3.0.25就可以了
 

二、安裝、配置

 

python版本3.5.4 別用3.6, 也別用3.7,很多不支持,巨坑無比
redis 5.0.8.msi ----------- rabbitmq 這個應該沒什么差異


pip install redis
pip3 install tornado==5.1.1 或者4.1 pip3 install celery==3.1 pip3 install pika==0.9.14 pip3 install tornado-celery


 

 三、window下面測試最簡單celery有返回值的實例--時間:2020/7/17

 用redis和rabbitmq效率差的不是一點半點

tasks.py

from celery import Celery
#配置好celery的backend和broker
# app = Celery('tasks',  backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
# app = Celery('tasks',  backend='redis://localhost:6379', broker='redis://localhost:6379')
app = Celery('tasks', backend='amqp://localhost:5672', broker='amqp://localhost:5672')
# app.conf.CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # app.conf.CELERY_RESULT_BACKEND = "redis" # app.conf.CELERY_ACCEPT_CONTENT = ['application/json'] # app.conf.CELERY_TASK_SERIALIZER = 'json' # app.conf.CELERY_RESULT_SERIALIZER = 'json' # app.conf.BROKER_HEARTBEAT = 30 # app.conf.CELERY_IGNORE_RESULT = False # this is less important @app.task #普通函數裝飾為 celery task def add(x, y): return x + y

 

#trigger.py
import time
from tasks import add

a = time.time()
result = add.delay(4, 4) #不要直接 add(4, 4),這里需要用 celery 提供的接口 delay 進行調用
# result = add.apply_async((4, 4)) #不要直接 add(4, 4),這里需要用 celery 提供的接口 apply_async 進行調用
print(result.ready())
# while not result.ready():
#     time.sleep(1)

print('task done: {0}'.format(result.get()))
print(time.time() - a)
main.py

 

celery啟動命令當前目錄下

celery -A tasks worker --loglevel=info

 

升級版window下面測試最簡單celery有返回值的實例(簡單的調整了一下)

  

 

 

 config.py

 

# file_name=init_celery.py
# coding: utf-8
from celery import Celery

BROKER_URL = 'amqp://localhost:5672'
BACKEND_URL = 'amqp://localhost:5672'

# Add tasks here
CELERY_IMPORTS = (
    'tasks',
)

celery = Celery('celery',
                broker=BROKER_URL,
                backend=BACKEND_URL,
                include=CELERY_IMPORTS, )

celery.conf.update(
    CELERY_ACKS_LATE=True,  # 允許重試
    CELERY_ACCEPT_CONTENT=['pickle', 'json'],
    CELERYD_FORCE_EXECV=True,  # 有些情況可以防止死鎖
    CELERYD_CONCURRENCY=4,  # 設置並發worker數量
    CELERYD_MAX_TASKS_PER_CHILD=500,  # 每個worker最多執行500個任務被銷毀,可以防止內存泄漏
    BROKER_HEARTBEAT=0,  # 心跳
    CELERYD_TASK_TIME_LIMIT=12 * 30,  # 超時時間
)

main.py

from tasks import add


def notify(a, b):
    result = add.delay(a, b)
    print("ready: ", result.ready())

    print('return data: {0}'.format(result.get()))
    return result


if __name__ == '__main__':
    import time

    a = time.time()
    haha = notify(6, 7)
    print("status: ", haha.status)
    print("id: ", haha.id)
    print("used time: %s s" % (time.time() - a))

tasks.py

from config import celery


# 普通函數裝飾為 celery task
@celery.task
def add(x, y):
    return x + y

run_celery.sh  --當前目錄下執行

celery -A config worker --loglevel=info

 

 Python3.5.4 Tornado + Celery + Redis 得到Celery返回值(時間:2020/7/18記錄)看到這,后面的就不用看了,這個例子最新

 這個問題困擾我很久,今天終於解決了,為什么呢?因為呀過幾天有個面試,可能涉及這個問題,所以熬了幾天終於把這個問題給解決了

其實這個問題已經拖了幾年了,原因是以前我用的是tornado+celery+rabbitmq,不需要返回值,直接去處理任務就好了,可是當時思考我如果需要拿返回值怎么辦

后來就在這個地方卡住了,怎么也沒找到正確的解決方法,因為一直認為是自己的寫法或者沒搞明白原理,而沒有間接的去尋找解決方案

首先先把Tornado + Celery + RabbitMQ得不到返回值的解決方案說一下

用RabbitMQ的好處是,RabbitMQ的效率比Redis的效率要高,但為什么redis能拿到返回值呢
因為pika,它連接了redis,而celery把值存儲到了redis,這個時候生成了有個key存儲到redis,然后tornado這面在從redis取值,知道了這個原理,那后面也就好說了
tornado + celery + redis的redis這里既充當了消息隊列,還用到了redis的緩存 解決方案: tornado也生成一個key,給celery,然后celery的task的函數(例如add)把返回值存儲到緩存里面,tornado在去取就行了呀 rabbitmq充當消息隊列,redis充當緩存角色

版本

 
         
python版本3.5.4 別用3.6, 也別用3.7,很多不支持,巨坑無比
redis 5.0.8.msi ----------- rabbitmq 這個應該沒什么差異


pip install redis
pip3 install tornado==5.1.1 或者4.1 pip3 install celery==3.1 pip3 install pika==0.9.14 pip3 install tornado-celery 

 

 

 app.py  --- 這里tcelery感覺並沒有起到什么作用,如果誰有時間可以進行一下測試,看看

import tornado.ioloop
import tornado.web
from tornado import gen
from tornado.gen import coroutine
from tornado.web import asynchronous
import tcelery
from tasks import test

tcelery.setup_nonblocking_producer()  # 設置為非阻塞生產者,否則無法獲取回調信息

class MainHandler(tornado.web.RequestHandler):
    @coroutine
    @asynchronous
    def get(self):
        import time
        a = time.time()
        # result = yield torncelery.async_me(test, "hello world") #不能這么用
        # result = yield gen.Task(test.apply_async, args=["hello11"]) # 不能這么用

        result = test.delay("hello world")
        print("result: ", result.get())
        print("ready: ", result.ready())
        print("status: ", result.status)
        print("id: ", result.id)

        b = time.time()
        print(b-a)
        self.write("%s" % result.get())
        self.finish()

application = tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

 

 tasks.py

from celery import Celery
import time

celery = Celery('tasks',  backend='redis://localhost:6379', broker='redis://localhost:6379')
# celery = Celery('tasks',  backend='amqp://localhost:5672', broker='amqp://localhost:5672')
celery.conf.update(
    CELERY_ACKS_LATE=True,  # 允許重試
    CELERY_ACCEPT_CONTENT=['pickle', 'json'],
    CELERYD_FORCE_EXECV=True,  # 有些情況可以防止死鎖
    CELERYD_CONCURRENCY=4,  # 設置並發worker數量
    CELERYD_MAX_TASKS_PER_CHILD=500,  # 每個worker最多執行500個任務被銷毀,可以防止內存泄漏
    BROKER_HEARTBEAT=60,  # 心跳
    CELERYD_TASK_TIME_LIMIT=12 * 30,  # 超時時間
)

@celery.task
def test(strs):
    print("str:", strs)
    return strs

 

這里有一個問題,就是響應時間,第一次是3s,以后每次都是0.5s,如何提升效率

 

 

 

 

 

配置詳情

 

單個參數配置:
app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘ 

 

多個參數配置:
app.conf.update(
     CELERY_BROKER_URL = ‘amqp://guest@localhost//‘,
     CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
 )

 

從配置文件中獲取:(將配置參數寫在文件app.py中)

 BROKER_URL=‘amqp://guest@localhost//
 CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘
 app.config_from_object(‘celeryconfig‘)

 

三、案例  --這個也有問題了

啟動一個Celery 任務隊列,也就是消費者:

 from celery import Celery
 celery = Celery(‘tasks‘, broker=‘amqp://guest:guest@119.29.151.45:5672‘, backend=‘amqp‘)  使用RabbitMQ作為載體, 回調也是使用rabbit作為載體
 
 @celery.task(name=‘doing‘)   #異步任務,需要命一個獨一無二的名字
 def doing(s, b):
     print(‘開始任務‘)
     logging.warning(‘開始任務--{}‘.format(s))
     time.sleep(s)
     return s+b

 

命令行啟動任務隊列守護進程,當隊列中有任務時,自動執行 (命令行可以放在supervisor中管理)
--loglevel=info --concurrency=5
記錄等級,默認是concurrency:指定工作進程數量,默認是CPU核心數

 

啟動任務生產者 -- 當初寫這個代碼的時候沒想好可能,現在的重新弄一下了

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tcelery
from tornado.web import RequestHandler
import tornado

tcelery.setup_nonblocking_producer()  # 設置為非阻塞生產者,否則無法獲取回調信息


class MyMainHandler(RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        print('begin')
        result = yield tornado.gen.Task(sleep.apply_async, args=[10])  # 使用yield 獲取異步返回值,會一直等待但是不阻塞其他請求
        print('ok - -{}'.format(result.result))  # 返回值結果

        # sleep.apply_async((10, ), callback=self.on_success)
        # print(‘ok -- {}‘.format(result.get(timeout=100)))#使用回調的方式獲取返回值,發送任務之后,請求結束,所以不能放在處理tornado的請求任務當中,因為請求已經結束了,與客戶端已經斷開連接,無法再在獲取返回值的回調中繼續向客戶端返回數據

        # result = sleep.delay(10)    #delay方法只是對apply_async方法的封裝而已
        # data = result.get(timeout=100)  #使用get方法獲取返回值,會導致阻塞,相當於同步執行


        def on_success(self, response):  # 回調函數
            print('Ok - - {}'.format(response))

 

=======================

#!/usr/bin/env python

# -*-  coding:utf-8 -*-

from tornado.web import Application



from tornado.ioloop import IOLoop



import tcelery

from com.analysis.handlers.data_analysis_handlers import *

from com.analysis.handlers.data_summary_handlers import *

from com.analysis.handlers.data_cid_sumjson_handler import Cid_Sumjson_Handler

from com.analysis.handlers.generator_handlers import GeneratorCsv, GeneratorSpss





Handlers = [



    (r"/single_factor_variance_analysis/(.*)", SingleFactorVarianceAnalysis), # 單因素方差檢驗





]



if __name__ == "__main__":

    tcelery.setup_nonblocking_producer()

    application = Application(Handlers)

    application.listen(port=8888, address="0.0.0.0")

    IOLoop.instance().start()
server

 

#!/usr/bin/env python

# -*-  coding:utf-8 -*-



import tornado.gen

import tornado.web

from com.analysis.core.base import BaseAnalysisRequest

from com.analysis.tasks.data_analysis import *





class SingleFactorVarianceAnalysis(BaseAnalysisRequest):

    @tornado.gen.coroutine

    def get(self, *args, **kwargs):

        response = yield self.celery_task(single_factor_variance_analysis.apply_async, params=args)

        print(response.result)

        self.write(response.result[2])
handler

 

#!/usr/bin/env python

# -*-  coding:utf-8 -*-



from collections import defaultdict



import pandas as pd

import numpy as np

import pygal

import tornado.gen

from pygal.style import LightStyle

from tornado.web import RequestHandler

import json



from com.analysis.db.db_engine import DBEngine

from com.analysis.utils.log import LogCF

from com.analysis.handlers.data_cid_sumjson_handler import cid_sumjson





class BaseRequest(RequestHandler):

    def __init__(self, application, request, **kwargs):

        super(BaseRequest, self).__init__(application, request, **kwargs)





class BaseAnalysisRequest(BaseRequest):

    def __init__(self, application, request, **kwargs):

        super(BaseAnalysisRequest, self).__init__(application, request, **kwargs)



    @tornado.gen.coroutine

    def celery_task(self, func, params, queue="default_analysis"):

        args_list = list(params)

        args_list.insert(0, "")

        response = yield tornado.gen.Task(func, args=args_list, queue=queue)

        raise tornado.gen.Return(response)
basehandler

 

#!/usr/bin/env python

# -*-  coding:utf-8 -*-





from celery import Celery



from com.analysis.core.chi_square_test import CST

from com.analysis.generator.generator import GeneratorCsv, GeneratorSpss



celery = Celery(

    'com.analysis.tasks.data_analysis',

    broker='amqp://192.168.1.1:5672',

    include='com.analysis.tasks.data_analysis'

)

celery.conf.CELERY_RESULT_BACKEND = "amqp://192.168.1.1:5672"

celery.conf.CELERY_ACCEPT_CONTENT = ['application/json']

celery.conf.CELERY_TASK_SERIALIZER = 'json'

celery.conf.CELERY_RESULT_SERIALIZER = 'json'

celery.conf.BROKER_HEARTBEAT = 30

celery.conf.CELERY_IGNORE_RESULT = False  # this is less important

logger = Logger().getLogger()





@celery.task()

def single_factor_variance_analysis(*args):

    return SFV().do_(*args)
task

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM