Python 並行分布式框架:Celery 超詳細介紹


本博客摘自:http://blog.csdn.net/liuxiaochen123/article/details/47981111

先來一張圖,這是在網上最多的一張Celery的圖了,確實描述的非常好

這里寫圖片描述

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

任務執行單元

Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

OK 廢話就說到這, 來點使用的。

首先你也看到了, 你要有一個消息中間件,此處我們選擇rabbitmq,為什么不用redis或者sqs呢,首先這兩個我都用過了,想接觸以下rabbitmq,所以果斷選擇這個。

Now 安裝rabbitmq! 
官網介紹有安裝方法, 我貼以下網址吧,自己看看,很簡單很簡單。 我是Mac系統 http://www.rabbitmq.com/install-standalone-mac.html 如果是其他系統自己對應下。 可以把sbin的路徑配置到path里面(我比較懶 沒加,所以去到解壓目錄,囧)

啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 
啟動rabbitmq:sbin/rabbitmq-server -detached

ok, now,rabbitmq已經啟動,可以打開頁面來看看 
地址:http://localhost:15672/#/ 
用戶名密碼都是guest 
現在可以進來了把,可以看到具體頁面。 
關於rabbitmq的配置,網上很多 自己去搜以下就ok了。

好了 消息中間件有了,現在該來代碼了,我是在celeby官網看的,如果覺得我代碼有問題,可以自己去官網看,嘿嘿。

安裝celeby。 
建議使用 virtualenv,具體怎么用 參考 
http://www.nowamagic.net/academy/detail/1330228 
http://liuzhijun.iteye.com/blog/1872241

首先,定義一個task。

from celery import Celery app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

保存為tasks.py 
—>broker 就是中間件了,自己看着改吧, backend就是 后端來發送狀態消息,保持追蹤任務的狀態,存儲或發送這些狀態

Now 可以啟動了 
命令: celery -A tasks worker –loglevel=info 
想要查看完整的命令行參數列表 
命令:celery worker –help 或者 
celery help

現在 另開一個terminal,啟用虛擬環境, ipython 啟動python console

In [9]: from tasks import add In [10]: result = add.delay(6, 7)
  • 1
  • 2
  • 3

現在你可以在用之前命令啟動的終端中看到輸出,而且可以驗證結果。 
調用任務會返回一個 AsyncResult 實例,可用於檢查任務的狀態,等待任務完成或獲取返回值, 而且現在我們也設置了一個用於保存結果和狀態等信息的backend, 現在你可以成功的拿到結果, 如果你print result, 你會看到一串字符串, 類似與uuid。 
如下方式:

In [11]: result.ready() Out[11]: True In [12]: result.get(timeout=1) Out[12]: 13 In [13]: result.get(propagate=False) Out[13]: 13
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

ok 現在看到結果了吧。 
注: propagate的作用 倘若任務拋出了一個異常, get() 會重新拋出異常, 但你可以指定 propagate 參數來覆蓋這一行為。

以上就是一些代碼了。

下面是 celery的配置,配置的話 你不想看這個 可以去官網看,比我的詳細的多。

app.conf.update(
    CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

但是對於大型項目來說 這樣配置就顯得很low,這個時候可以用模塊。你可以調用 config_from_object() 來讓 Celery 實例加載配置模塊。 
app.config_from_object(‘celeryconfig’)

celeryconfig.py

BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Europe/Oslo' CELERY_ENABLE_UTC = True
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以使用 python -m celeryconfig 來驗證配置是否正確。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM