Python Celery與RabbitMQ結合操作


 0、RabbitMQ安裝請參考另外一篇博客

https://www.cnblogs.com/ygbh/p/13461525.html

1、安裝celery模式

# Celery + RabbitMQ
pip install "celery[librabbitmq]"

# Celery + RabbitMQ + Redis
pip install "celery[librabbitmq,redis,auth,msgpack]"

提示:

如果是在Window系統開發或運行測試需要進行如下操作,否則會報錯:ValueError: not enough values to unpack (expected 3, got 0)

解決方法:
# 安裝協程模塊
pip  install eventlet

 2、編寫第一個celery框架的程序

from celery import Celery

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)

# 設置配置參數的文件,創建文件celeryconfig.py來配置參數
app.config_from_object('celeryconfig')


@app.task
def add(x, y):
    """
        求和的函數
    :param x:
    :param y:
    :return:
    """
    return x + y
tasks.py
# 中間件
broker_url = 'pyamqp://'

# 運行結果
result_backend = 'rpc://'

# 任務序列化
task_serializer = 'json'

# 結果序列化
result_serializer = 'json'

# 接收的數據類型
accept_content = ['json']

# 設置時區
timezone = 'Asia/Shanghai'

# UTC時區換算關閉
enable_utc = False
celeryconfig.py

 測試運行

運行一個求和的任務

celery -A tasks worker --loglevel=info --pool=eventlet -------------- celery@xxx-20200417GVK v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2020-08-20 16:42:50
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x2b6850cb7b8
- ** ---------- .> transport:   amqp://development:**@192.168.2.129:5672//development_host
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2020-08-20 16:42:50,153: INFO/MainProcess] Connected to amqp://development:**@192.168.2.129:5672//development_host
[2020-08-20 16:42:50,165: INFO/MainProcess] mingle: searching for neighbors
[2020-08-20 16:42:51,307: INFO/MainProcess] mingle: all alone
[2020-08-20 16:42:51,327: INFO/MainProcess] pidbox: Connected to amqp://development:**@192.168.2.129:5672//development_host.
[2020-08-20 16:42:51,335: INFO/MainProcess] celery@xxx-20200417GVK ready.

 在程序目錄打開python解釋器

代碼目錄\celert_dir>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.

#導入求和模塊
>>> from tasks import add

#給task隊列的add任務傳入兩個數字進行求和
>>> result=add.delay(1,3)

#檢查求和函數是否執行完成
>>> result.ready()
True

#獲取運算結果,1秒超時處理
>>> result.get(timeout=1)
4

#如果有異常的時候,可以通過traceback獲取異常信息
result.traceback

3、編寫一個項目程序

目錄如下:

proj/
├── celery.py
├── __init__.py
└── tasks.py

代碼

from celery import Celery

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url, include=['proj.tasks'])

app.conf.update(result_expires=3600, )

if __name__ == '__main__':
    app.start()
celery.py
from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)
tasks.py

程序的運行方式

(1)、前台運行方式

celery worker -A proj -l info               

(2)、后台運行方式

#同時開啟兩個處理任務
celery multi start work_task1 -A proj -l info                 
celery multi start work_task2 -A proj -l info 

#關閉任務
celery multi stop work_task1 -A proj -l info                 
celery multi stop work_task2 -A proj -l info 


#重啟任務
celery multi restart work_task1 -A proj -l info                 
celery multi restart work_task2 -A proj -l info 

#等待運行完成后,關閉任務
celery multi stopwait work_task1 -A proj -l info                 
celery multi stopwait work_task2 -A proj -l info 

 注意:這種啟動方式會報錯,不知道是什么問題,該問題已解決!

從GititHub發現,只是有人提出來,沒有得到根本的解決方式,於是自己看了官方文檔,並對源碼做出修改。

[2020-08-21 14:50:49,689: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@localhost:5672//:
Couldn't log in: server connection error 403, message: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile..
Trying again in 6.00 seconds... (1/100)

解決方式:

vi $PYTHON_HOME/lib/site-packages/celery/bin/base.py

    def setup_app_from_commandline(self, argv):
        preload_options, remaining_options = self.parse_preload_options(argv)
        quiet = preload_options.get('quiet')
        if quiet is not None:
            self.quiet = quiet
        try:
            self.no_color = preload_options['no_color']
        except KeyError:
            pass
        workdir = preload_options.get('workdir')
        if workdir:
            os.chdir(workdir)
        app = (preload_options.get('app') or
               os.environ.get('CELERY_APP') or
               self.app)

 # Add Code【增加如下代碼】## ######################## if app: os.environ['CELERY_APP']=app ########################
        preload_loader = preload_options.get('loader')
        if preload_loader:
            # Default app takes loader from this env (Issue #1066).
            os.environ['CELERY_LOADER'] = preload_loader
        loader = (preload_loader,
                  os.environ.get('CELERY_LOADER') or
                  'default')
        broker = preload_options.get('broker', None)

 4、編寫Celery的main函數啟動

代碼

from celery import Celery

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    app.worker_main()
tasks.py

 運行效果

5、Celery的配置文件的使用

5.1、實時的配置方法

from celery import Celery

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)


@app.task
def add(x, y):
    return x + y


if __name__ == '__main__':
    app.conf.update(
        enable_utc=False,
        timezone='Asia/Shanghai'
    )
    print('enable_utc', app.conf.timezone)
    print('timezone', app.conf.enable_utc)
update_config_1.py

5.2、使用模塊名的配置參數

# 中間件
broker_url = 'pyamqp://'

# 運行結果
result_backend = 'rpc://'

# 任務序列化
task_serializer = 'json'

# 結果序列化
result_serializer = 'json'

# 接收的數據類型
accept_content = ['json']

# 設置時區
timezone = 'Asia/Shanghai'

# UTC時區換算關閉
enable_utc = False
celeryconfig.py
from celery import Celery

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)
# 設置配置參數的文件,創建文件celeryconfig.py來配置參數
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    print('enable_utc', app.conf.timezone)
    print('timezone', app.conf.enable_utc)
tasks.py

5.3、使用模塊的導入方法【不推薦 】 

# 中間件
broker_url = 'pyamqp://'

# 運行結果
result_backend = 'rpc://'

# 任務序列化
task_serializer = 'json'

# 結果序列化
result_serializer = 'json'

# 接收的數據類型
accept_content = ['json']

# 設置時區
timezone = 'Asia/Shanghai'

# UTC時區換算關閉
enable_utc = False
celeryconfig.py
from celery import Celery
import celeryconfig

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)
# 設置配置參數的文件,創建文件celeryconfig.py來配置參數
app.config_from_object(celeryconfig)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    print('enable_utc', app.conf.timezone)
    print('timezone', app.conf.enable_utc)
tasks.py

5.4、使用類的方式進行配置方法

from celery import Celery

class Config:
    # 中間件
    broker_url = 'pyamqp://'

    # 運行結果
    result_backend = 'rpc://'

    # 任務序列化
    task_serializer = 'json'

    # 結果序列化
    result_serializer = 'json'

    # 接收的數據類型
    accept_content = ['json']

    # 設置時區
    timezone = 'Asia/Shanghai'

    # UTC時區換算關閉
    enable_utc = False


# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)
# 設置配置參數的文件,創建文件celeryconfig.py來配置參數
app.config_from_object(Config)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    print('enable_utc', app.conf.timezone)
    print('timezone', app.conf.enable_utc)
tasks.py

5.5、使用環境變量的配置方法 

from celery import Celery
import os

# 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'

# 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'

# 實例化一個celery對象
app = Celery('tasks', broker=broker_url, backend=backend_url)

os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app.config_from_envvar('CELERY_CONFIG_MODULE')

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    print('enable_utc', app.conf.timezone)
    print('timezone', app.conf.enable_utc)
tasks.py


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM