測試使用環境:
1、Python==3.6.1
2、MongoDB==3.6.2
3、celery==4.1.1
4、eventlet==0.23.0
Celery分為3個部分
(1)worker部分負責任務的處理,即工作進程(我的理解工作進程就是你寫的python代碼,當然還包括python調用系統工具功能)
(2)broker部分負責任務消息的分發以及任務結果的存儲,這部分任務主要由中間數據存儲系統完成,比如消息隊列服務器RabbitMQ、redis、
Amazon SQS、MongoDB、IronMQ等或者關系型數據庫,使用關系型數據庫依賴sqlalchemy或者django的ORM
(3)Celery主類,進行任務最開始的指派與執行控制,他可以是單獨的python腳本,也可以和其他程序結合,應用到django或者flask等web框架里面以及你能想到的任何應用
上代碼
這里將celery封裝成一個Python包,結構如下圖
celery.py
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 """ 5 Celery主類 6 啟動文件名必須為celery.py!!! 7 """ 8 9 from __future__ import absolute_import # 為兼容Python版本 10 from celery import Celery, platforms 11 12 platforms.C_FORCE_ROOT = True # linux環境下,用於開啟root也可以啟動celery服務,默認是不允許root啟動celery的 13 app = Celery( 14 main='celery_tasks', # celery啟動包名稱 15 # broker='redis://localhost', 16 # backend='redis://localhost', 17 include=['celery_tasks.tasks', ] # celery所有任務 18 ) 19 app.config_from_object('celery_tasks.config') # celery使用文件配置 20 21 if __name__ == '__main__': 22 app.start()
config.py
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 from __future__ import absolute_import 4 5 CELERY_TIMEZONE = 'Asia/Shanghai' 6 # CELERY_RESULT_BACKEND='redis://localhost:6379/1' 7 # BROKER_URL='redis://localhost:6379/2' 8 BROKER_BACKEND = 'mongodb' # mongodb作為任務隊列(或者說是緩存) 9 BROKER_URL = 'mongodb://localhost:27017/for_celery' # 隊列地址 10 CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/for_celery' # 消息結果存儲地址 11 CELERY_MONGODB_BACKEND_SETTINGS = { # 消息結果存儲配置 12 'host': 'localhost', 13 'port': 27017, 14 'database': 'for_celery', 15 # 'user':'root', 16 # 'password':'root1234', 17 'taskmeta_collection': 'task_meta', # 任務結果的存放collection 18 } 19 CELERY_ROUTES = { # 配置任務的先后順序 20 'celery_task.tasks.add': {'queue': 'for_add', 'router_key': 'for_add'}, 21 'celery_task.tasks.subtract': {'queue': 'for_subtract', 'router_key': 'for_subtract'} 22 }
tasks.py
#!/usr/bin/env python # -*- coding: utf-8 -*- """ worker部分 """ from __future__ import absolute_import from celery import Celery, group from .celery import app from time import sleep @app.task def add(x, y): sleep(5) return x + y @app.task def substract(x, y): sleep(5) return x - y
接下來演示,演示之前先把config中mongdb的用到的database和collection配置好,並啟動mongodb服務
首先啟動consumer
注意啟動目錄為celery_tasks同一級,啟動命令為
celery -A celery_tasks worker --loglevel=info -P eventlet
參數解釋,命令中-A參數表示的是Celery APP的名稱celery_tasks,這個實例中指的就是tasks.py,后面的tasks就是APP的名稱,worker是一個執行任務角色,后面的loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執行程序中add這個加法任務(task),-P eventlet是防止在windows環境下出現
[2018-06-02 15:08:15,550: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs)))
File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
若啟動成功,結果如下
再啟動produce
在另一個終端terminal,首先啟動Python
如下
再導入並調用任務,使用delay方法
如下
調用之后,回到consumer終端,發現
收到任務。
再到mongodb中查看任務結果