本博客摘自:http://blog.csdn.net/liuxiaochen123/article/details/47981111
先來一張圖,這是在網上最多的一張Celery的圖了,確實描述的非常好
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache
OK 廢話就說到這, 來點使用的。
首先你也看到了, 你要有一個消息中間件,此處我們選擇rabbitmq,為什么不用redis或者sqs呢,首先這兩個我都用過了,想接觸以下rabbitmq,所以果斷選擇這個。
Now 安裝rabbitmq!
官網介紹有安裝方法, 我貼以下網址吧,自己看看,很簡單很簡單。 我是Mac系統 http://www.rabbitmq.com/install-standalone-mac.html 如果是其他系統自己對應下。 可以把sbin的路徑配置到path里面(我比較懶 沒加,所以去到解壓目錄,囧)
啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management
啟動rabbitmq:sbin/rabbitmq-server -detached
ok, now,rabbitmq已經啟動,可以打開頁面來看看
地址:http://localhost:15672/#/
用戶名密碼都是guest
現在可以進來了把,可以看到具體頁面。
關於rabbitmq的配置,網上很多 自己去搜以下就ok了。
好了 消息中間件有了,現在該來代碼了,我是在celeby官網看的,如果覺得我代碼有問題,可以自己去官網看,嘿嘿。
安裝celeby。
建議使用 virtualenv,具體怎么用 參考
http://www.nowamagic.net/academy/detail/1330228
http://liuzhijun.iteye.com/blog/1872241
首先,定義一個task。
from celery import Celery app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y
- 1
- 2
- 3
- 4
- 5
- 6
- 7
保存為tasks.py
—>broker 就是中間件了,自己看着改吧, backend就是 后端來發送狀態消息,保持追蹤任務的狀態,存儲或發送這些狀態
Now 可以啟動了
命令: celery -A tasks worker –loglevel=info
想要查看完整的命令行參數列表
命令:celery worker –help 或者
celery help
現在 另開一個terminal,啟用虛擬環境, ipython 啟動python console
In [9]: from tasks import add In [10]: result = add.delay(6, 7)
- 1
- 2
- 3
現在你可以在用之前命令啟動的終端中看到輸出,而且可以驗證結果。
調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值, 而且現在我們也設置了一個用於保存結果和狀態等信息的backend, 現在你可以成功的拿到結果, 如果你print result, 你會看到一串字符串, 類似與uuid。
如下方式:
In [11]: result.ready() Out[11]: True In [12]: result.get(timeout=1) Out[12]: 13 In [13]: result.get(propagate=False) Out[13]: 13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
ok 現在看到結果了吧。
注: propagate的作用 倘若任務拋出了一個異常, get() 會重新拋出異常, 但你可以指定 propagate 參數來覆蓋這一行為。
以上就是一些代碼了。
下面是 celery的配置,配置的話 你不想看這個 可以去官網看,比我的詳細的多。
app.conf.update(
CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
- 1
- 2
- 3
- 4
- 5
- 6
- 7
但是對於大型項目來說 這樣配置就顯得很low,這個時候可以用模塊。你可以調用 config_from_object() 來讓 Celery 實例加載配置模塊。
app.config_from_object(‘celeryconfig’)
celeryconfig.py
BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Europe/Oslo' CELERY_ENABLE_UTC = True
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
可以使用 python -m celeryconfig 來驗證配置是否正確。