0、RabbitMQ安裝請參考另外一篇博客
https://www.cnblogs.com/ygbh/p/13461525.html
1、安裝celery模式
# Celery + RabbitMQ pip install "celery[librabbitmq]" # Celery + RabbitMQ + Redis pip install "celery[librabbitmq,redis,auth,msgpack]"
提示:
如果是在Window系統開發或運行測試需要進行如下操作,否則會報錯:ValueError: not enough values to unpack (expected 3, got 0) 解決方法: # 安裝協程模塊 pip install eventlet
2、編寫第一個celery框架的程序

from celery import Celery # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) # 設置配置參數的文件,創建文件celeryconfig.py來配置參數 app.config_from_object('celeryconfig') @app.task def add(x, y): """ 求和的函數 :param x: :param y: :return: """ return x + y

# 中間件 broker_url = 'pyamqp://' # 運行結果 result_backend = 'rpc://' # 任務序列化 task_serializer = 'json' # 結果序列化 result_serializer = 'json' # 接收的數據類型 accept_content = ['json'] # 設置時區 timezone = 'Asia/Shanghai' # UTC時區換算關閉 enable_utc = False
測試運行
運行一個求和的任務
celery -A tasks worker --loglevel=info --pool=eventlet -------------- celery@xxx-20200417GVK v4.4.7 (cliffs) --- ***** ----- -- ******* ---- Windows-10-10.0.18362-SP0 2020-08-20 16:42:50 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x2b6850cb7b8 - ** ---------- .> transport: amqp://development:**@192.168.2.129:5672//development_host - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 4 (eventlet) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2020-08-20 16:42:50,153: INFO/MainProcess] Connected to amqp://development:**@192.168.2.129:5672//development_host [2020-08-20 16:42:50,165: INFO/MainProcess] mingle: searching for neighbors [2020-08-20 16:42:51,307: INFO/MainProcess] mingle: all alone [2020-08-20 16:42:51,327: INFO/MainProcess] pidbox: Connected to amqp://development:**@192.168.2.129:5672//development_host. [2020-08-20 16:42:51,335: INFO/MainProcess] celery@xxx-20200417GVK ready.
在程序目錄打開python解釋器
代碼目錄\celert_dir>python Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. #導入求和模塊 >>> from tasks import add #給task隊列的add任務傳入兩個數字進行求和 >>> result=add.delay(1,3) #檢查求和函數是否執行完成 >>> result.ready() True #獲取運算結果,1秒超時處理 >>> result.get(timeout=1) 4 #如果有異常的時候,可以通過traceback獲取異常信息 result.traceback
3、編寫一個項目程序
目錄如下:
proj/
├── celery.py
├── __init__.py
└── tasks.py
代碼

from celery import Celery # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url, include=['proj.tasks']) app.conf.update(result_expires=3600, ) if __name__ == '__main__': app.start()

from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
程序的運行方式
(1)、前台運行方式
celery worker -A proj -l info
(2)、后台運行方式
#同時開啟兩個處理任務 celery multi start work_task1 -A proj -l info celery multi start work_task2 -A proj -l info #關閉任務 celery multi stop work_task1 -A proj -l info celery multi stop work_task2 -A proj -l info #重啟任務 celery multi restart work_task1 -A proj -l info celery multi restart work_task2 -A proj -l info #等待運行完成后,關閉任務 celery multi stopwait work_task1 -A proj -l info celery multi stopwait work_task2 -A proj -l info
注意:這種啟動方式會報錯,不知道是什么問題,該問題已解決!
從GititHub發現,只是有人提出來,沒有得到根本的解決方式,於是自己看了官方文檔,並對源碼做出修改。
[2020-08-21 14:50:49,689: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@localhost:5672//:
Couldn't log in: server connection error 403, message: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.. Trying again in 6.00 seconds... (1/100)
解決方式:
vi $PYTHON_HOME/lib/site-packages/celery/bin/base.py
def setup_app_from_commandline(self, argv):
preload_options, remaining_options = self.parse_preload_options(argv)
quiet = preload_options.get('quiet')
if quiet is not None:
self.quiet = quiet
try:
self.no_color = preload_options['no_color']
except KeyError:
pass
workdir = preload_options.get('workdir')
if workdir:
os.chdir(workdir)
app = (preload_options.get('app') or
os.environ.get('CELERY_APP') or
self.app)
# Add Code【增加如下代碼】## ######################## if app: os.environ['CELERY_APP']=app ########################
preload_loader = preload_options.get('loader')
if preload_loader:
# Default app takes loader from this env (Issue #1066).
os.environ['CELERY_LOADER'] = preload_loader
loader = (preload_loader,
os.environ.get('CELERY_LOADER') or
'default')
broker = preload_options.get('broker', None)
4、編寫Celery的main函數啟動
代碼

from celery import Celery # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) @app.task def add(x, y): return x + y if __name__ == '__main__': app.worker_main()
運行效果
5、Celery的配置文件的使用
5.1、實時的配置方法

from celery import Celery # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) @app.task def add(x, y): return x + y if __name__ == '__main__': app.conf.update( enable_utc=False, timezone='Asia/Shanghai' ) print('enable_utc', app.conf.timezone) print('timezone', app.conf.enable_utc)
5.2、使用模塊名的配置參數

# 中間件 broker_url = 'pyamqp://' # 運行結果 result_backend = 'rpc://' # 任務序列化 task_serializer = 'json' # 結果序列化 result_serializer = 'json' # 接收的數據類型 accept_content = ['json'] # 設置時區 timezone = 'Asia/Shanghai' # UTC時區換算關閉 enable_utc = False

from celery import Celery # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) # 設置配置參數的文件,創建文件celeryconfig.py來配置參數 app.config_from_object('celeryconfig') @app.task def add(x, y): return x + y if __name__ == '__main__': print('enable_utc', app.conf.timezone) print('timezone', app.conf.enable_utc)
5.3、使用模塊的導入方法【不推薦 】

# 中間件 broker_url = 'pyamqp://' # 運行結果 result_backend = 'rpc://' # 任務序列化 task_serializer = 'json' # 結果序列化 result_serializer = 'json' # 接收的數據類型 accept_content = ['json'] # 設置時區 timezone = 'Asia/Shanghai' # UTC時區換算關閉 enable_utc = False

from celery import Celery import celeryconfig # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) # 設置配置參數的文件,創建文件celeryconfig.py來配置參數 app.config_from_object(celeryconfig) @app.task def add(x, y): return x + y if __name__ == '__main__': print('enable_utc', app.conf.timezone) print('timezone', app.conf.enable_utc)
5.4、使用類的方式進行配置方法

from celery import Celery class Config: # 中間件 broker_url = 'pyamqp://' # 運行結果 result_backend = 'rpc://' # 任務序列化 task_serializer = 'json' # 結果序列化 result_serializer = 'json' # 接收的數據類型 accept_content = ['json'] # 設置時區 timezone = 'Asia/Shanghai' # UTC時區換算關閉 enable_utc = False # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) # 設置配置參數的文件,創建文件celeryconfig.py來配置參數 app.config_from_object(Config) @app.task def add(x, y): return x + y if __name__ == '__main__': print('enable_utc', app.conf.timezone) print('timezone', app.conf.enable_utc)
5.5、使用環境變量的配置方法

from celery import Celery import os # 中間件,這里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host' # 后端儲存,這里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host backend_url = 'rpc://development:root@192.168.2.129:5672//development_host' # 實例化一個celery對象 app = Celery('tasks', broker=broker_url, backend=backend_url) os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig') app.config_from_envvar('CELERY_CONFIG_MODULE') @app.task def add(x, y): return x + y if __name__ == '__main__': print('enable_utc', app.conf.timezone) print('timezone', app.conf.enable_utc)