一、介紹
celery是一個基於python開發的分布式異步消息任務隊列,用於處理大量消息,同時為操作提供維護此類系統所需的工具。
它是一個任務隊列,專注於實時處理,同時還支持任務調度。如果你的業務場景中需要用到異步任務,就可以考慮使用celery
二、實例場景
1、你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
2、你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福
三、優點
- 1、簡單:一但熟悉了celery的工作流程后,配置和使用還是比較簡單的
- 2、高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
- 3、快速:一個單進程的celery每分鍾可處理上百萬個任務
- 4、靈活:幾乎celery的各個組件都可以被擴展及自定制
四、入門
celery 需要一個解決方案來發送和接受消息,通常,這是以稱為消息代理的單獨服務的形式出現的
有以下幾種解決方案,包括:
一:RabbitMQ(消息隊列,一種程序之間的通信方式)
rabbitmq 功能齊全,穩定,耐用且易於安裝。它是生產環境的絕佳選擇。
如果您正在使用Ubuntu或Debian,請執行以下命令安裝RabbitMQ:
$ sudo apt-get install rabbitmq-server
命令完成后,代理已經在后台運行,准備為您移動消息:。Starting rabbitmq-server: SUCCESS
二、redis
redis功能齊全,但在突然中止或者電源故障時更容易丟失數據
五、安裝
$ pip install celery
六、應用
創建一個tasks.py文件
from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def add(x, y): return x + y
第一個參數Celery是當前模塊的名稱。只有在__main__模塊中定義任務時才能自動生成名稱。
第二個參數是broker關鍵字參數,指定要使用的消息代理的URL。這里使用RabbitMQ(也是默認選項)。
您可以使用RabbitMQ amqp://localhost,或者您可以使用Redis redis://localhost。
您定義了一個名為add的任務,返回兩個數字的總和。

1 from __future__ import absolute_import 2 import os 3 from celery import Celery 4 from django.conf import settings 5 # set the default Django settings module for the 'celery' program. 6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings') 7 app = Celery('saruman_server') 8 9 # Using a string here means the worker will not have to 10 # pickle the object when using Windows. 11 app.config_from_object('django.conf:settings') 12 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 13 14 @app.task(bind=True) 15 def debug_task(self): 16 print('Request: {0!r}'.format(self.request))
七、運行celery工作服務器
您現在可以通過使用worker 參數執行我們的程序來運行worker :
celery -A tasks worker --loglevel=info
有關可用命令行選項的完整列表,請執行以下操作:
$ celery worker --help
還有其他幾個可用的命令,也可以提供幫助:
$ celery help
八、調用任務
要調用我們的任務,您可以使用該delay()方法。
apply_async() 可以更好地控制任務執行
>>> from tasks import add >>> add.delay(4, 4)
調用任務會返回一個AsyncResult實例。這可用於檢查任務的狀態,等待任務完成,或獲取其返回值(或者如果任務失敗,則獲取異常和回溯)。
九、保持結果
如果您想跟蹤任務的狀態,Celery需要在某處存儲或發送狀態。有幾個內置的結果后端可供選擇:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您可以定義自己的。
在本例中,我們使用rpc結果后端,它將狀態作為瞬態消息發回。后端通過backend參數 指定Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者,如果您想使用Redis作為結果后端,但仍然使用RabbitMQ作為消息代理(一種流行的組合):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
現在配置了結果后端,讓我們再次調用該任務。這次你將保持AsyncResult調用任務時返回的實例:
>>> result = add.delay(4, 4)
該ready()方法返回任務是否已完成處理:
>>> result.ready()
False
十、配置
與消費類電器一樣,celery不需要太多配置即可運行。它有一個輸入和一個輸出。輸入必須連接代理,輸出可以
選擇到結果后端。
可以直接在應用程序上或使用專用配置模塊設置配置。例如,您可以通過更改task_serializer設置來配置用於序列化任務有效負載的默認序列化程序:
app.conf.task_serializer = 'json'
如果您一次配置了許多設置,則可以使用update:
app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, )
對於大型項目,建議使用專用配置模塊。不鼓勵硬編碼周期性任務間隔和任務路由選項。將它們保存在集中位置要好得多。對於庫來說尤其如此,因為它使用戶能夠控制其任務的行為方式。集中配置還允許您的SysAdmin在發生系統故障時進行簡單的更改。
您可以通過調用app.config_from_object()方法告訴Celery實例使用配置模塊:
app.config_from_object('celeryconfig')
此模塊通常稱為“ celeryconfig”,但您可以使用任何模塊名稱。
在上面的例子中,一個名為的模塊celeryconfig.py必須可以從當前目錄或Python路徑加載。它可能看起來像這樣:
celeryconfig.py:
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True

1 from datetime import timedelta 2 3 import djcelery 4 5 djcelery.setup_loader() 6 BROKER_URL = 'amqp://guest@localhost//' #輸入 7 CELERY_RESULT_BACKEND = 'amqp://guest@localhost//' #返回的結果 8 9 #導入指定的任務模塊 10 CELERY_IMPORTS = ( 11 'fir.app.fir.tasks', 12 ) 13 14 CELERYBEAT_SCHEDULE = { 15 'receive_mail': { 16 "task": "fir.app.fir.tasks.receive_mail", 17 "schedule": timedelta(seconds=5), 18 "args": (), 19 }, 20 }
要驗證配置文件是否正常工作且不包含任何語法錯誤,您可以嘗試導入它:
####################################################
python -m celeryconfig
為了演示配置文件的強大功能,您可以將行為不當的任務路由到專用隊列:
celeryconfig.py:
task_routes = {
'tasks.add': 'low-priority',
}
或者不是路由它,而是可以對任務進行速率限制,這樣在一分鍾(10 / m)內只能處理10種此類任務:
celeryconfig.py:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
如果您使用RabbitMQ或Redis作為代理,那么您還可以指示工作人員在運行時為任務設置新的速率限制:
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully
十一、在項目中如何使用celery
1、可以把celery配置成一個應用
2、目錄結構如下:
proj/__init__.py /celery.py /tasks.py
3、proj/celery.py內容
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
4、proj/tasks.py中的內容
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
5、啟動worker
$ celery -A proj worker -l info
輸出
-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall) ---- **** ----- --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x103a020f0 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost/ - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
django 中使用celery:參考鏈接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
十二、監控工具flower
如果有些任務出現問題,可以用flower工具監控(基於tornado)
安裝:pip install flower
使用:
三種啟動方式
celery flower celery flower --broker python manage.py celery flower #就能讀取到配置里的broker_url 默認是rabbitmq
打開運行后的鏈接
打開worker
python manage.py celery worker -l info