一、簡介
作為分布式任務平台的基本框架,Celery 是由 Python 語言開發的。Celery 本身不是任務隊列,是管理分布式任務隊列的工具,它封裝了操作常見任務隊列的各種操作。我們使用它可以快速進行任務隊列的使用與管理。本文主要說明如何更規范的配置和管理任務。
更詳細的技術文檔,請訪問官網 http://www.celeryproject.org/。
二、一些概念
在使用 Celery 之前請務必理解以下概念:
a. Celery Beat: 任務調度器,Beat 進程會讀取配置文件的內容,周期性的將配置中到期需要執行的任務發送給任務隊列
b. Celery Worker: 執行任務的消費者,通常會在多台服務器運行多個消費者來提高運行效率。
c. Broker: 消息代理,也是任務隊列本身(通常是消息隊列或者數據庫),通常稱為消息中間件,接收任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方。
d. Producer: 任務生產者,調用 Celery API 的函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
三、配置項說明
Celery 是通過配置文件中的配置項來定制任務的。
CELERY_IMPORTS: 配置導入哥哥任務的代碼模塊
CELERY_QUEUES: 定義任務執行的各個任務隊列(如按照執行時間分slow、fast等),默認有一個隊列,暫稱為一般任務隊列。
CELERY_ROUTES: 配置各個任務分配到不同的任務隊列
CELERY_SCHEDULE: 配置各個任務執行的時機參數
CELERY_TIMEZONE: 設置時區
CELERY_ENABLE_UTC: 是否啟動時區設置,默認值是True
CELERY_CONCURRENCY: 並發的worker數量
CELERY_PREFETCH_MULTIPLIER: 每次去消息隊列讀取任務的數量,默認值是4
CELERY_MAX_TASKS_PRE_CHILD: 每個worker執行多少次任務后會死掉
BROKER_URL: 使用redis作為任務隊列
CELERY_TASK_RESULT_EXPIRES: 任務執行結果的超時時間
CELERY_TASK_TIME_LIMIT: 單個任務運行的時間限制,超時會被殺死,不建議使用該參數,而用CELERY_TASK_SOFT_TIME_LIMIT
CELERY_RESULT_TACKEND: 使用redis存儲執行結果
CELERY_TASK_SERIALIZER: 任務序列化方式
CELERY_RESULT_SERIALIZER: 任務執行結果序列化方式
CELERY_DISABLE_RATE_LIMITS: 關閉執行限速
四、實用中的目錄結構
background
|---celery_task
|----daemons
|----day
|----hour
|----minute
|----week
|----test
|----test.py
|----CeleryConfig.py # 配置文件
|----__init__.py # 定義celery任務的名稱及配置文件位置
五、簡要配置
__init__.py內容:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 # by MR 4 5 from celery import Celery 6 7 app = Celery("MyCelery") 8 app.config_from_object("background.celery_task.CeleryConfig")
CeleryConfig.py 內容:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 4 from celery.schedulers import crontab 5 from kombu import Exchange, Queue 6 7 BROKER_URL = "redis://127.0.0.1:6379/10" 8 CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/11" 9 CELERY_TASK_SERIALIZER = "json" 10 CELERY_RESULT_SERIALIZER = "json" 11 CELERY_TSKK_RESULT_EXPIRES = 60 * 60 * 24 * 3 12 CELERY_ACCEPT_CONTENT = ["json"] 13 CELERY_TIMEZONE = "Asia/Shanghai" 14 15 CELERY_CONCURRENCY = 20 16 CELERY_MAX_TASK_PER_CHILD = 100 17 CELERY_TASK_SOFT_TIME_LIMIT = 300 18 CELERY_DISABLE_RATE_LIMITS = True 19 20 21 CELERY_IMPORTS = ( 22 "background.celery_task.test.TestTask", 23 ) 24 25 CELERY_QUEUES = ( 26 Queue('task_slow', exchange=Exchange('task_slow'), routing_key='task_slow'), 27 ) 28 29 CELERY_ROUTES = { 30 'background.celery_task.test.TestTask.test': {'queue': 'task_slow, 'routing_key': 'task_slow'}, 31 } 32 33 34 CELERYBEAT_SCHEDULE = { 35 '1_minute_test_task_test': { 36 'task': 'background.celery_task.test.TestTask.test', 37 'schedule': crontab(minute='*/1') 38 'args': (), 39 } 40 }
test.py的內容:
#!/usr/bin/python # -*- coding: utf-8 -*- import random
from background.celery_task import app app.task(soft_time_limit=300) def test(): a = random.randint(1, 10) b = random.randint(20, 30) print ("%(a)s+%(b)s=%(c)s" % {"a": a, "b":b, "c": a+b}) print 'OK'
六、Celry啟動
安裝一個叫flower的webui,提供任務查詢,worker生命管理,以及路由管理等(底層是通過tornado框架封裝的)
flower -A background.celery_task --loglevel=info --url-prefix=task_manager --basic_auth=celery:celerywd --log_file_prefix=~/MR/celery/flower.log >/dev/null 2>&1 &
celery -A background.celery_task worker -Q celery -f ~/MR/celery/celery.log --loglevel=info --beat >/dev/null 2>&1 &
七、demo地址