Celery介紹和基本使用
Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子:
1)你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
2)你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福
Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis,后面會講
1.1 Celery有以下優點:
- 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
- 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
- 快速:一個單進程的celery每分鍾可處理上百萬個任務
- 靈活: 幾乎celery的各個組件都可以被擴展及自定制

- Celery包含如下組件:
- Celery Beat:任務調度器,Beat進程會讀取配置文件的內容周期性地將配置中到期需要執行的任務發送給任務隊列。
- Celery Worker:執行任務的消費者,通常會在多台服務器運行多個消費者來提高執行效率。
- Broker:消息代理,或者叫做消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。
- Producer:調用Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
- Result Backend:任務處理完后保存狀態信息和結果,以供查詢。celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

執行依靠
celery call 發送任務給這個 -->celery組件--->發送給rabbitmq,交給---->worker1|worker2
- 環境的部署
安裝celery模塊:pip install celery
基礎代碼worker端
from celery import Celery app = Celery('tasks',broker='redis://:alex3714@192.168.14.52') #‘tasks’給app起的名字 backend='redis://alex3714@localhost')#將結果寫入到哪 @app.task def add(x,y): print("running...",x,y) return x+y @app.task def cmd(cmd_str): print('running cmd',cmd_str)
執行這個腳本: celery -A celery_test -l debug #當執行完這條命令后就會進入監聽狀態
#celery_test不用寫后綴名, 指定模式。
調度端執行,會形成異步執行
from celery_test import add,cmd add(4,5) #這么執行的話只是本地顯示結果。 add.delay(45,2) #讓worker端執行 t1 = add.delay(45,2)當你賦值的時候worker端還會在執行一次 t1.get() #獲取執行的結果
celery -A celery_test worker -l info #在開一個客戶端。
#這次我們測試這個任務是怎么分發的。 t2 = add.delay(22,44) t2 = add.delay(22,44) t2 = add.delay(22,44) t2 = add.delay(22,44) #當你執行的很快的時候你會發現這個任務和rabbit MQ一樣是搶任務的。 from celery import Celery app = Celery('tasks',broker='redis://:alex3714@192.168.14.52') #‘tasks’給app起的名字 backend='redis://alex3714@localhost')#將結果寫入到哪 @app.task def add(x,y): print("running...",x,y) return x+y @app.task def cmd(cmd_str): print('running cmd',cmd_str) time.sleep(10) return time.time()
調度端
from celery_test import add,cmd t1 = cmd.delay('dsfsdf') t1.get() #這時你會發現程序是阻塞的,並沒有實現異步的效果 #當然這個解決也是非常簡單的 t1.get(timeout=1,propa) #這時你會發現提示你任務沒有完成 t1.ready() #可以查看任務沒有沒有完成,返回布爾值的狀態
基礎測試代碼celery_test.py測試端:
from celery import Celery app = Celery('proj',include=['proj.tasks']) #app是Celery類的實例,創建的時候添加了proj.tasks這個模塊,也就是包含了proj/tasks.py這個文件。 app.config_from_object('proj.celeryconfig')把Celery配置存放進proj/celeryconfig.py文件,使用app.config_from_object加載配置。 if __name__ == '__main__' app.start()
存放任務函數的文件tasks.py
from __future__ import absolute_import from proj.celery import app @app.task #讓這個任務生效的裝飾器 def add(x, y): return x + y
配置文件celeryconfig.py
BROKER_URL = 'amqp://dongwm:123456@localhost:5672/web_develop' # 使用RabbitMQ作為消息代理 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務結果存在了Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間,不建議直接寫86400,應該讓這樣的magic數字表述更明顯 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內容類型
啟動上面的代碼 celery -A celery_test -l info #-A指定人物名,不需要后綴,-l指定模式
開啟另外一個終端,用IPython調用add函數:
from celery_test import add r = add.delay(1,3) #在任務端執行一次 r #執行第二次 r.result #返回結果 r.status #返回狀態 r.successful() #去執行結果 r.backend #保存到redis中 通過IPython觸發的任務就完成了。任務的結果都需要根據上面提到的task_id獲得,我們還可以用如下兩種方式隨時找到這個結果: task_id = '93288a00-94ee-4727-b815-53dc3474cf3f' In : add.AsyncResult(task_id).get() Out: 4 或者: In : from celery.result import AsyncResult In : AsyncResult(task_id).get() Out: 4
指定隊列
Celery非常容易設置和運行,通常它會使用默認的名為celery的隊列(可以通過CELERY_DEFAULT_QUEUE修改)用來存放任務。我們可以使用優先級不同的隊列來確保高優先級的任務不需要等待就得到響應。
基於proj目錄下的源碼,我們創建一個projq目錄,並對projq/celeryconfig.py添加如下配置:
指定隊列
from kombu import Queue CELERY_QUEUES = ( # 定義任務隊列 Queue('default', routing_key='task.#'), # 路由鍵以“task.”開頭的消息都進default隊列 Queue('web_tasks', routing_key='web.#'), # 路由鍵以“web.”開頭的消息都進web_tasks隊列 ) CELERY_DEFAULT_EXCHANGE = 'tasks' # 默認的交換機名字為tasks CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默認的交換類型是topic CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默認的路由鍵是task.default,這個路由鍵符合上面的default隊列 CELERY_ROUTES = { 'projq.test.add': { # tasks.add的消息會進入web_tasks隊列 'queue': 'web_tasks', 'routing_key': 'web.add', } }
現在用指定隊列的方式啟動消費者進程:
celery -A projq worker -Q web_tasks -l info
上述worker只會執行web_tasks中的任務,我們可以合理安排消費者數量,讓web_tasks中任務的優先級更高。
使用任務調度
之前的例子都是由發布者觸發的,本節展示一下使用Celery的Beat進程自動生成任務。基於proj目錄下的源碼,創建一個projb目錄,對projb/celeryconfig.py添加如下配置:
CELERYBEAT_SCHEDULE = { 'add': { 'task': 'celery_test.add', 'schedule': timedelta(seconds=10), #每10秒執行一次 'args': (16, 16) #執行的參數是 } }
啟動Beat程序:
celery beat -A projb
然后啟動Worker進程:
celery -A projb worker -l info
之后可以看到每10秒都會自動執行一次tasks.add。
注:Beat和Worker進程可以一並啟動:
celery -B -A projb worker -l info
使用Django可以通過django-celery實現在管理后台創建、刪除、更新任務,是因為它使用了自定義的調度類djcelery.schedulers.DatabaseScheduler,我們可以參考它實現Flask或者其他Web框架的管理后台來完成同樣的功能。使用自定義調度類還可以實現動態添加任務。
任務綁定、記錄日志和重試
任務綁定、記錄日志和重試是Celery常用的3個高級屬性。現在修改proj/tasks.py文件,添加div函數用於演示:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info(('Executing task id {0.id}, args: {0.args!r} ' 'kwargs: {0.kwargs!r}').format(self.request)) try: result = x / y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) return result
當使用bind = True后,函數的參數發生變化,多出了參數self(第一個參數),相當於把div變成了一個已綁定的方法,通過self可以獲得任務的上下文。
在IPython中調用div:
from proj.tasks import div r = div.delay(2, 1) [2017-09-20 15:50:31,853: INFO/Worker-1] proj.tasks.div[1da82fb8-20de-4d5a-9b48-045da6db0cda]: Executing task id 1da82fb8-20de-4d5a-9b48-045da6db0cda, args: [2, 1] kwargs: {}
換成能造成異常的參數:
In : r = div.delay(2, 0)
可以發現每5秒就會重試一次,一共重試3次(默認重復3次),然后拋出異常。
再來一張很給力的圖 
