一、簡介
Celery是由Python開發、簡單、靈活、可靠的分布式任務隊列,其本質是生產者消費者模型,生產者發送任務到消息隊列,消費者負責處理任務。Celery側重於實時操作,但對調度支持也很好,其每天可以處理數以百萬計的任務。特點:
- 簡單:熟悉celery的工作流程后,配置使用簡單
- 高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務
- 快速:一個單進程的celery每分鍾可處理上百萬個任務
- 靈活:幾乎celery的各個組件都可以被擴展及自定制
應用場景舉例:
1.web應用:當用戶在網站進行某個操作需要很長時間完成時,我們可以將這種操作交給Celery執行,直接返回給用戶,等到Celery執行完成以后通知用戶,大大提好網站的並發以及用戶的體驗感。
2.任務場景:比如在運維場景下需要批量在幾百台機器執行某些命令或者任務,此時Celery可以輕松搞定。
3.定時任務:向定時導數據報表、定時發送通知類似場景,雖然Linux的計划任務可以幫我實現,但是非常不利於管理,而Celery可以提供管理接口和豐富的API。
二、架構&工作原理
Celery由以下三部分構成:消息中間件(Broker)、任務執行單元Worker、結果存儲(Backend),如下圖:
工作原理:
- 任務模塊Task包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往消息隊列,而定時任務由Celery Beat進程周期性地將任務發往消息隊列;
- 任務執行單元Worker實時監視消息隊列獲取隊列中的任務執行;
- Woker執行完任務后將結果保存在Backend中;
消息中間件Broker
消息中間件Broker官方提供了很多備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ。
任務執行單元Worker
Worker是任務執行單元,負責從消息隊列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實現分布式的核心。
結果存儲Backend
Backend結果存儲官方也提供了諸多的存儲方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。
三、安裝使用
這里我使用的redis作為消息中間件,redis安裝可以參考https://www.cnblogs.com/wdliu/p/9360286.html。
Celery安裝:
pip3 install celery
簡單使用
目錄結構:
project/ ├── __init__.py ├── config.py └── tasks.py
各目錄文件說明:
__init__.py:初始化Celery以及加載配置文件
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from celery import Celery app = Celery('project') # 創建 Celery 實例 app.config_from_object('project.config') # 加載配置模塊
config.py: Celery相關配置文件,更多配置參考:http://docs.celeryproject.org/en/latest/userguide/configuration.html
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作為消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這里使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,可以指定多個 'project.tasks', )
tasks.py :任務定義文件
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def show_name(name): return name
啟動Worker:
celery worker -A project -l debug
各個參數含義:
worker: 代表第啟動的角色是work當然還有beat等其他角色;
-A :項目路徑,這里我的目錄是project
-l:啟動的日志級別,更多參數使用celery --help查看
查看日志輸出,會發現我們定義的任務,以及相關配置:
雖然啟動了worker,但是我們還需要通過delay或apply_async來將任務添加到worker中,這里我們通過交互式方法添加任務,並返回AsyncResult對象,通過AsyncResult對象獲取結果:
AsyncResult除了get方法用於常用獲取結果方法外還提以下常用方法或屬性:
- state: 返回任務狀態;
- task_id: 返回任務id;
- result: 返回任務結果,同get()方法;
- ready(): 判斷任務是否以及有結果,有結果為True,否則False;
- info(): 獲取任務信息,默認為結果;
- wait(t): 等待t秒后獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則wait期間一直阻塞,直到超時報錯;
- successfu(): 判斷任務是否成功,成功為True,否則為False;
四、進階使用
對於普通的任務來說可能滿足不了我們的任務需求,所以還需要了解一些進階用法,Celery提供了諸多調度方式,例如任務編排、根據任務狀態執行不同的操作、重試機制等,以下會對常用高階用法進行講述。
定時任務&計划任務
Celery的提供的定時任務主要靠schedules來完成,通過beat組件周期性將任務發送給woker執行。在示例中,新建文件period_task.py,並添加任務到配置文件中:
period_task.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒執行add sender.add_periodic_task( crontab(hour=16, minute=56, day_of_week=1), #每周一下午四點五十六執行sayhai sayhi.s('wd'),name='say_hi' ) @app.task def add(x,y): print(x+y) return x+y @app.task def sayhi(name): return 'hello %s' % name
config.py
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作為消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這里使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,可以指定多個 'project.tasks', 'project.period_task', #定時任務 )
啟動worker和beat:
celery worker -A project -l debug #啟動work celery beat -A project.period_task -l debug #啟動beat,注意此時對應的文件路徑
我們可以觀察worker日志:
還可以通過配置文件方式指定定時和計划任務,此時的配置文件如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作為消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這里使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,可以指定多個 'project.tasks', 'project.period_task', ) app.conf.beat_schedule = { 'period_add_task': { # 計划任務 'task': 'project.period_task.add', #任務路徑 'schedule': crontab(hour=18, minute=16, day_of_week=1), 'args': (3, 4), }, 'add-every-30-seconds': { # 每10秒執行 'task': 'project.period_task.sayhi', #任務路徑 'schedule': 10.0, 'args': ('wd',) }, }
此時的period_task.py只需要注冊到woker中就行了,如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def add(x,y): print(x+y) return x+y @app.task def sayhi(name): return 'hello %s' % name
同樣啟動worker和beat結果和第一種方式一樣。更多詳細的內容請參考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
任務綁定
Celery可通過任務綁定到實例獲取到任務的上下文,這樣我們可以在任務運行時候獲取到任務的狀態,記錄相關日志等。
修改任務中的period_task.py,如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.utils.log import get_task_logger
logger = get_task_logger(__name__) @app.task(bind=True) # 綁定任務 def add(self,x,y): logger.info(self.request.__dict__) #打印日志 try: a=[] a[10]==1 except Exception as e: raise self.retry(exc=e, countdown=5, max_retries=3) # 出錯每5秒嘗試一次,總共嘗試3次 return x+y
在以上代碼中,通過bind參數將任務綁定,self指任務的上下文,通過self獲取任務狀態,同時在任務出錯時進行任務重試,我們觀察日志:
內置鈎子函數
Celery在執行任務時候,提供了鈎子方法用於在任務執行完成時候進行對應的操作,在Task源碼中提供了很多狀態鈎子函數如:on_success(成功后執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行),在進行使用是我們只需要重寫這些方法,完成相應的操作即可。
在以下示例中,我們繼續修改period_task.py,分別定義三個任務來演示任務失敗、重試、任務成功后執行的操作:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.utils.log import get_task_logger from celery import Task logger = get_task_logger(__name__) class demotask(Task): def on_success(self, retval, task_id, args, kwargs): # 任務成功執行 logger.info('task id:{} , arg:{} , successful !'.format(task_id,args)) def on_failure(self, exc, task_id, args, kwargs, einfo): #任務失敗執行 logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc)) def on_retry(self, exc, task_id, args, kwargs, einfo): #任務重試執行 logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc)) @app.task(base=demotask,bind=True) def add(self,x,y): try: a=[] a[10]==1 except Exception as e: raise self.retry(exc=e, countdown=5, max_retries=1) # 出錯每5秒嘗試一次,總共嘗試1次 return x+y @app.task(base=demotask) def sayhi(name): a=[] a[10]==1 return 'hi {}'.format(name) @app.task(base=demotask) def sum(a,b): return 'a+b={} '.format(a+b)
此時的配置文件config.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app from celery.schedules import crontab BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作為消息中間件 CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這里使用redis CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間 CELERY_TIMEZONE='Asia/Shanghai' # 時區配置 CELERY_IMPORTS = ( # 指定導入的任務模塊,可以指定多個 'project.tasks', 'project.period_task', ) app.conf.beat_schedule = { 'add': { # 每10秒執行 'task': 'project.period_task.add', #任務路徑 'schedule': 10.0, 'args': (10,12), }, 'sayhi': { # 每10秒執行 'task': 'project.period_task.sayhi', #任務路徑 'schedule': 10.0, 'args': ('wd',), }, 'sum': { # 每10秒執行 'task': 'project.period_task.sum', #任務路徑 'schedule': 10.0, 'args': (1,3), }, }
然后重啟worker和beat,查看日志:
任務編排
在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery同樣也能實現這樣的任務,完成這類型的任務通過以下模塊完成:
-
group: 並行調度任務
-
chain: 鏈式任務調度
-
chord: 類似group,但分header和body2個部分,header可以是一個group任務,執行完成后調用body的任務
-
map: 映射調度,通過輸入多個入參來多次調度同一個任務
-
starmap: 類似map,入參類似*args
-
chunks: 將任務按照一定數量進行分組
修改tasks.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from project import app @app.task def add(x,y): return x+y @app.task def mul(x,y): return x*y @app.task def sum(data_list): res=0 for i in data_list: res+=i return res
group: 組任務,組內每個任務並行執行
和project同級目錄新建consumer.py如下:
from celery import group from project.tasks import add,mul,sum res = group(add.s(1,2),add.s(1,2))() # 任務 [1+2,1+2] while True: if res.ready(): print('res:{}'.format(res.get())) break
結果:
chain:鏈式任務
鏈式任務中,默認上一個任務的返回結果作為參數傳遞給子任務
from celery import chain from project.tasks import add,mul,sum res = chain(add.s(1,2),add.s(3),mul.s(3))() # 任務((1+2)+3)*3 while True: if res.ready(): print('res:{}'.format(res.get())) break #結果 #res:18
還可以使用|表示鏈式任務,上面任務也可以表示為:
res = (add.s(1,2) | add.s(3) | (mul.s(3)))()
res.get()
chord:任務分割,分為header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果作為參數傳遞給body
from celery import chord from project.tasks import add,mul,sum res = chord(header=[add.s(1,2),mul.s(3,4)],body=sum.s())() # 任務(1+2)+(3*4) while True: if res.ready(): print('res:{}'.format(res.get())) break #結果: #res:15
chunks:任務分組,按照任務的個數分組
from project.tasks import add,mul,sum res = add.chunks(zip(range(5),range(5)),4)() # 4 代表每組的任務的個數 while True: if res.ready(): print('res:{}'.format(res.get())) break
結果:
delay &apply_async
對於delay和apply_async都可以用來進行任務的調度,本質上是delay對apply_async進行了再一次封裝(或者可以說是快捷方式),兩者都返回AsyncResult對象,以下是兩個方法源碼。

def delay(self, *args, **kwargs): """Star argument version of :meth:`apply_async`. Does not support the extra options enabled by :meth:`apply_async`. Arguments: *args (Any): Positional arguments passed on to the task. **kwargs (Any): Keyword arguments passed on to the task. Returns: celery.result.AsyncResult: Future promise. """ return self.apply_async(args, kwargs)

def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): """Apply tasks asynchronously by sending a message. Arguments: args (Tuple): The positional arguments to pass on to the task. kwargs (Dict): The keyword arguments to pass on to the task. countdown (float): Number of seconds into the future that the task should execute. Defaults to immediate execution. eta (~datetime.datetime): Absolute time and date of when the task should be executed. May not be specified if `countdown` is also supplied. expires (float, ~datetime.datetime): Datetime or seconds in the future for the task should expire. The task won't be executed after the expiration time. shadow (str): Override task name used in logs/monitoring. Default is retrieved from :meth:`shadow_name`. connection (kombu.Connection): Re-use existing broker connection instead of acquiring one from the connection pool. retry (bool): If enabled sending of the task message will be retried in the event of connection loss or failure. Default is taken from the :setting:`task_publish_retry` setting. Note that you need to handle the producer/connection manually for this to work. retry_policy (Mapping): Override the retry policy used. See the :setting:`task_publish_retry_policy` setting. queue (str, kombu.Queue): The queue to route the task to. This must be a key present in :setting:`task_queues`, or :setting:`task_create_missing_queues` must be enabled. See :ref:`guide-routing` for more information. exchange (str, kombu.Exchange): Named custom exchange to send the task to. Usually not used in combination with the ``queue`` argument. routing_key (str): Custom routing key used to route the task to a worker server. If in combination with a ``queue`` argument only used to specify custom routing keys to topic exchanges. priority (int): The task priority, a number between 0 and 9. Defaults to the :attr:`priority` attribute. serializer (str): Serialization method to use. Can be `pickle`, `json`, `yaml`, `msgpack` or any custom serialization method that's been registered with :mod:`kombu.serialization.registry`. Defaults to the :attr:`serializer` attribute. compression (str): Optional compression method to use. Can be one of ``zlib``, ``bzip2``, or any custom compression methods registered with :func:`kombu.compression.register`. Defaults to the :setting:`task_compression` setting. link (Signature): A single, or a list of tasks signatures to apply if the task returns successfully. link_error (Signature): A single, or a list of task signatures to apply if an error occurs while executing the task. producer (kombu.Producer): custom producer to use when publishing the task. add_to_parent (bool): If set to True (default) and the task is applied while executing another task, then the result will be appended to the parent tasks ``request.children`` attribute. Trailing can also be disabled by default using the :attr:`trail` attribute publisher (kombu.Producer): Deprecated alias to ``producer``. headers (Dict): Message headers to be included in the message. Returns: celery.result.AsyncResult: Promise of future evaluation. Raises: TypeError: If not enough arguments are passed, or too many arguments are passed. Note that signature checks may be disabled by specifying ``@task(typing=False)``. kombu.exceptions.OperationalError: If a connection to the transport cannot be made, or if the connection is lost. Note: Also supports all keyword arguments supported by :meth:`kombu.Producer.publish`. """ if self.typing: try: check_arguments = self.__header__ except AttributeError: # pragma: no cover pass else: check_arguments(*(args or ()), **(kwargs or {})) app = self._get_app() if app.conf.task_always_eager: with denied_join_result(): return self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options) if self.__v2_compat__: shadow = shadow or self.shadow_name(self(), args, kwargs, options) else: shadow = shadow or self.shadow_name(args, kwargs, options) preopts = self._get_exec_options() options = dict(preopts, **options) if options else preopts options.setdefault('ignore_result', self.ignore_result) return app.send_task( self.name, args, kwargs, task_id=task_id, producer=producer, link=link, link_error=link_error, result_cls=self.AsyncResult, shadow=shadow, task_type=self, **options )
對於其使用,apply_async支持常用參數:
- eta:指定任務執行時間,類型為datetime時間類型;
- countdown:倒計時,單位秒,浮點類型;
- expires:任務過期時間,如果任務在超過過期時間還未執行則回收任務,浮點類型獲取datetime類型;
- retry:任務執行失敗時候是否嘗試,布爾類型。;
- serializer:序列化方案,支持pickle、json、yaml、msgpack;
- priority:任務優先級,有0~9優先級可設置,int類型;
- retry_policy:任務重試機制,其中包含幾個重試參數,類型是dict如下:

max_retries:最大重試次數 interval_start:重試等待時間 interval_step:每次重試疊加時長,假設第一重試等待1s,第二次等待1+n秒 interval_max:最大等待時間 ####示例 add.apply_async((1, 3), retry=True, retry_policy={ 'max_retries': 1, 'interval_start': 0, 'interval_step': 0.8, 'interval_max': 5, })
更多參數參考:http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async
五、管理與監控
Celery管理和監控功能是通過flower組件實現的,flower組件不僅僅提供監控功能,還提供HTTP API可實現對woker和task的管理。
安裝使用
pip3 install flower
啟動
flower -A project --port=5555 # -A :項目目錄 #--port 指定端口
訪問http:ip:5555
api使用,例如獲取woker信息:
curl http://127.0.0.1:5555/api/workers
結果:
更多api參考:https://flower.readthedocs.io/en/latest/api.html