celery --分布式任務隊列


一、介紹

celery是一個基於python開發的分布式異步消息任務隊列,用於處理大量消息,同時為操作提供維護此類系統所需的工具。
它是一個任務隊列,專注於實時處理,同時還支持任務調度。如果你的業務場景中需要用到異步任務,就可以考慮使用celery

二、實例場景

1、你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
2、你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福

三、優點

  • 1、簡單:一但熟悉了celery的工作流程后,配置和使用還是比較簡單的
  • 2、高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
  • 3、快速:一個單進程的celery每分鍾可處理上百萬個任務
  • 4、靈活:幾乎celery的各個組件都可以被擴展及自定制

四、入門

celery 需要一個解決方案來發送和接受消息,通常,這是以稱為消息代理的單獨服務的形式出現的
有以下幾種解決方案,包括:
一:RabbitMQ(消息隊列,一種程序之間的通信方式)
rabbitmq 功能齊全,穩定,耐用且易於安裝。它是生產環境的絕佳選擇。
如果您正在使用Ubuntu或Debian,請執行以下命令安裝RabbitMQ:

$ sudo apt-get install rabbitmq-server

命令完成后,代理已經在后台運行,准備為您移動消息:。Starting rabbitmq-server: SUCCESS
二、redis

redis功能齊全,但在突然中止或者電源故障時更容易丟失數據

五、安裝

$ pip install celery 

六、應用

創建一個tasks.py文件

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
    return x + y

第一個參數Celery是當前模塊的名稱。只有在__main__模塊中定義任務時才能自動生成名稱。
第二個參數是broker關鍵字參數,指定要使用的消息代理的URL。這里使用RabbitMQ(也是默認選項)。
您可以使用RabbitMQ amqp://localhost,或者您可以使用Redis redis://localhost。
您定義了一個名為add的任務,返回兩個數字的總和。

 1 from __future__ import absolute_import
 2             import os
 3             from celery import Celery
 4             from django.conf import settings
 5             # set the default Django settings module for the 'celery' program.
 6             os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings')
 7             app = Celery('saruman_server')
 8 
 9             # Using a string here means the worker will not have to
10             # pickle the object when using Windows.
11             app.config_from_object('django.conf:settings')
12             app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
13 
14             @app.task(bind=True)
15             def debug_task(self):
16                 print('Request: {0!r}'.format(self.request))
和django配合實例

七、運行celery工作服務器

您現在可以通過使用worker 參數執行我們的程序來運行worker :

celery -A tasks worker --loglevel=info

有關可用命令行選項的完整列表,請執行以下操作:

$ celery worker --help

還有其他幾個可用的命令,也可以提供幫助:

$ celery help

八、調用任務

要調用我們的任務,您可以使用該delay()方法。
apply_async() 可以更好地控制任務執行

>>> from tasks import add
>>> add.delay(4, 4)

調用任務會返回一個AsyncResult實例。這可用於檢查任務的狀態,等待任務完成,或獲取其返回值(或者如果任務失敗,則獲取異常和回溯)。

九、保持結果

如果您想跟蹤任務的狀態,Celery需要在某處存儲或發送狀態。有幾個內置的結果后端可供選擇:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您可以定義自己的。
在本例中,我們使用rpc結果后端,它將狀態作為瞬態消息發回。后端通過backend參數 指定Celery

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用Redis作為結果后端,但仍然使用RabbitMQ作為消息代理(一種流行的組合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

現在配置了結果后端,讓我們再次調用該任務。這次你將保持AsyncResult調用任務時返回的實例:

>>> result = add.delay(4, 4)

該ready()方法返回任務是否已完成處理:

>>> result.ready()
False 

十、配置

與消費類電器一樣,celery不需要太多配置即可運行。它有一個輸入和一個輸出。輸入必須連接代理,輸出可以
選擇到結果后端。
可以直接在應用程序上或使用專用配置模塊設置配置。例如,您可以通過更改task_serializer設置來配置用於序列化任務有效負載的默認序列化程序:

app.conf.task_serializer = 'json'

如果您一次配置了許多設置,則可以使用update:

app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)

對於大型項目,建議使用專用配置模塊。不鼓勵硬編碼周期性任務間隔和任務路由選項。將它們保存在集中位置要好得多。對於庫來說尤其如此,因為它使用戶能夠控制其任務的行為方式。集中配置還允許您的SysAdmin在發生系統故障時進行簡單的更改。
您可以通過調用app.config_from_object()方法告訴Celery實例使用配置模塊:

app.config_from_object('celeryconfig')

此模塊通常稱為“ celeryconfig”,但您可以使用任何模塊名稱。
在上面的例子中,一個名為的模塊celeryconfig.py必須可以從當前目錄或Python路徑加載。它可能看起來像這樣:
celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
 1 from datetime import timedelta
 2 
 3 import djcelery
 4 
 5 djcelery.setup_loader()
 6 BROKER_URL = 'amqp://guest@localhost//'  #輸入
 7 CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'  #返回的結果
 8 
 9 #導入指定的任務模塊
10 CELERY_IMPORTS = (
11     'fir.app.fir.tasks',
12 )
13 
14 CELERYBEAT_SCHEDULE = {
15     'receive_mail': {
16         "task": "fir.app.fir.tasks.receive_mail",
17         "schedule": timedelta(seconds=5),
18         "args": (),
19     },
20 }
View Code  

要驗證配置文件是否正常工作且不包含任何語法錯誤,您可以嘗試導入它:
####################################################

python -m celeryconfig

為了演示配置文件的強大功能,您可以將行為不當的任務路由到專用隊列:

celeryconfig.py:
task_routes = {
'tasks.add': 'low-priority',
}
或者不是路由它,而是可以對任務進行速率限制,這樣在一分鍾(10 / m)內只能處理10種此類任務:

celeryconfig.py:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
如果您使用RabbitMQ或Redis作為代理,那么您還可以指示工作人員在運行時為任務設置新的速率限制:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully

十一、在項目中如何使用celery

1、可以把celery配置成一個應用
2、目錄結構如下:

proj/__init__.py
    /celery.py
    /tasks.py

3、proj/celery.py內容

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
    broker='amqp://',
    backend='amqp://',
    include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

4、proj/tasks.py中的內容

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

5、啟動worker

$ celery -A proj worker -l info

輸出

-------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x103a020f0
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

django 中使用celery:參考鏈接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django 

十二、監控工具flower

如果有些任務出現問題,可以用flower工具監控(基於tornado)
安裝:pip install flower

使用:
三種啟動方式

celery flower
celery flower --broker 
python manage.py celery flower #就能讀取到配置里的broker_url 默認是rabbitmq

打開運行后的鏈接
打開worker
python manage.py celery worker -l info

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM