一、概念
在一個應用服務中, 對於時效性要求沒那么高的業務場景,我們沒必要等到所有任務執行完才返回結果, 例如用戶注冊場景中, 保存了用戶賬號密碼之后. 就可以立即返回, 后續的賬號激活郵件, 可以用一個種異步的形式去處理, 這種異步操作可以⽤隊列服務來實現. 否則, 如果等到郵件發送成功可能⼏秒過去了.
Celery是Python語言實現的分布式隊列服務, 除了支持持即時任務, 還支持定時任務, Celery有5個核心角色.
1.Task
任務(Task)就是你要做的事情, 例如一個注冊流程里面有很多任務, 給用戶發驗證郵件就是一個任務, 這種耗時任務可以交給Celery去處理; 還有一種任務是定時任務, 比如每天定時統計網站的注冊人數, 這個也可以交給Celery周期性的處理.
2.Broker
Broker的中文意思是經紀人, 指為市場上買賣雙方提供中介服務的人. 在是Celery中它介於生產者和消費者之間經紀人, 這個角色相當於數據結構中的隊列. 例如一個Web系統中, 生產者是處理核心業務的Web程序, 業務中可能會產生一些耗時的任務; 比如短信 生產者會將任務發送給Broker, 就是把這個任務暫時放到隊列中, 等待消費者來處理. 消費者是Worker, 是專門用於執行任務的后台服務. Worker將實時監控隊列中是否有新的任務, 如果有就拿出來進行處理. Celery本身不提供隊列服務, 一般用Redis或者RabbitMQ來扮演Broker的角色.
3.Worker
Worker 就是那個一直在后台執行任務的人, 也稱為任務的消費者, 它會實時地監控隊列中有沒有任務, 如果有就立即取出來執行.
4.Beat
Beat是一個定時任務調度器, 它會根據配置定時將任務發送給Broker, 等待Worker來消費.
5.Backend
Backend用於保存任務的執行結果, 每個任務都有返回值, 比如發送郵件的服務會告訴我們有沒有發送成功, 這個結果就是存在Backend中.
二、環境搭建
Python3.6.8
通過pip 安裝 celery、redis
pip install celery==3.1.26.post2
pip install redis==2.10.6
安裝redis服務 redis-3.2.1.tar.gz參考:https://www.cnblogs.com/linux985/p/11344273.html
三、測試小栗子
為了測試Celery能否工作,我運行了一個最簡單的任務,編寫tasks.py,如下圖所示
import time from celery import Celery, platforms broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' platforms.C_FORCE_ROOT = True #允許在root下運行 app = Celery(__file__, broker=broker, backend=backend) @app.task def add(x, y): time.sleep(5) return x + y
上面的代碼做了幾件事:
創建了一個Celery實例app
指定消息中間件用redis, URL為redis://127.0.0.1:6379
指定存儲用 redis, URL為redis://127.0.0.1:6379
創建了一個Celery任務add, 當函數被@app.task裝飾后, 就成為可被 Celery 調度的任務。
啟動Celery任務
celery worker -A tasks --loglevel=info
參數 -A 指定了Celery實例的位置, 本例是在 tasks.py 中,Celery會自動在該文件中尋找Celery對象實例
參數 --loglevel 指定了日志級別, 默認為 warning, 也可以使用 -l info 來表示。
然后看到界面顯示結果如下:
我們可以看到Celery正常工作在名稱centos版本為3.1.10 ,在下面的[config]中我們可以看到當前APP的名稱tasks,運輸工具transport就是我們在程序中設置的中間人redis://127.0.0.1:6379//,result redis://127.0.0.1:6379//,然后我們也可以看到worker缺省使用perfork來執行並發,當前並發數顯示為4,然后可以看到下面的[queues]就是我們說的隊列,當前默認的隊列是celery,然后我們看到下面的[tasks]中有一個任務tasks.add.
了解了這些之后,根據文檔我重新打開一個terminal,然后執行Python,進入Python交互界面,用delay()方法調用任務,執行如下操作:
t.ready() ##判斷任務是否執行完畢
t.get() ##獲取任務執行結果
可以看到, 雖然任務函數add需要等待5秒才返回執⾏結果, 但由於它是⼀個異步任務, 不會阻塞當前的主程序.
這個任務已經由之前啟動的Worker異步執行了,然后我打開之前啟動的worker的控制台,對輸出進行查看驗證,結果如下:
10:59:09這一行說明worker收到了一個任務:tasks.add,這里我們和之前發送任務返回的AsyncResult對比我們發現,每個task都有一個唯一的ID,第二行說明了這個任務執行succeed,執行結果為3。
查看資料說調用任務后會返回一個AsyncResult實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(如果任務失敗,則為異常和回溯)。但這個功能默認是不開啟的,需要設置一個 Celery 的結果后端(backend),這塊我在下一個例子中進行了學習。
通過這個例子后我對Celery有了初步的了解,然后我在這個例子的基礎上去進一步的學習。
1.2 配置文件 在上面的例子中, 我們直接把Broker和 Backend的配置寫在了程序當中, 更好的做法是將配置項統一寫入到一個配置文件中. 1.2.1目錄結構: celery_demo # 項⽬根⽬錄 ├── celery_app # 存放 celery 相關⽂件 │ ├── __init__.py │ ├── config.py # 配置⽂件 │ ├── task1.py # 任務⽂件 1 │ └── task2.py # 任務⽂件 2 └── client.py # 應⽤程序 1.2.2 config.py文件中內容 BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend CELERY_TIMEZONE='Asia/Shanghai' # 指定時區,默認是 UTC CELERY_IMPORTS = ( # 指定導⼊的任務模塊 'celery_app.task1', 'celery_app.task2' ) 1.2.3 __init__.py文件內容 # -*- coding: utf-8 -*- from celery import Celery,platforms platforms.C_FORCE_ROOT = True app = Celery('demo') # 創建 Celery 實例 app.config_from_object('celery_app.config') # 通過 Celery 實例加載配置模塊 1.2.4 task1.py import time from celery_app import app @app.task def add(x, y): time.sleep(2) return x + y 1.2.5 task2.py from celery_app import app import time @app.task def say(): time.sleep(2) return 'helo' 1.2.6 client.py from celery_app import task1 from celery_app import task2 task1.add.delay(1, 2) task2.say.delay() 1.2.7 啟動 celery worker (websocket) [root@gitlab celery_demo]# celery -A celery_app worker -l info 1.2.8 執行client.py python client.py 1.2.9 運行python client.py后它會發送兩個異步任務到Broker, 在Worker的窗口我們可以看到如下輸出: [2018-10-19 06:52:31,389: INFO/MainProcess] Received task: celery_app.task1.add[b32962fe-dd61-443f-bc87-e666db957f24] [2018-10-19 06:52:31,391: INFO/MainProcess] Received task: celery_app.task2.say[d945e419-aa93-4a2c-aec4-105f81031a64] [2018-10-19 06:52:33,394: INFO/ForkPoolWorker-2] Task celery_app.task2.say[d945e419-aa93-4a2c-aec4-105f81031a64] succeeded in 2.00137619101s: 'helo' [2018-10-19 06:52:33,394: INFO/ForkPoolWorker-1] Task celery_app.task1.add[b32962fe-dd61-443f-bc87-e666db957f24] succeeded in 2.00418746s: 3
1.3 定時任務 Celery 除了可以執行異步任務, 也支持執行周期性任務(Periodic Tasks),或者說定時任務, Celery Beat 進程通過讀取配置文件的內容, 周期性地將定時任務發往任務隊列. 1.3.1 修改配置文件, 增加定時任務 from celery.schedules import crontab from datetime import timedelta BROKER_URL = 'redis://127.0.0.1:6379' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' CELERY_TIMEZONE='Asia/Shanghai' CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2' ) CELERYBEAT_SCHEDULE = { "task1": { "task": "celery_app.task1.add", "schedule": timedelta(seconds=1), "args":(1, 2), }, "task2": { "task": "celery_app.task2.say", "schedule": timedelta(seconds=2), "args":(), }, } 1.3.2 啟動celery celery -B -A celery_app worker --loglevel=info 1.3.3 在worker窗口查看任務輸出 [2018-10-19 07:40:31,966: INFO/ForkPoolWorker-2] Task celery_app.task1.add[6b216f23-036a-48ed-b14f-e252bf3f1ffb] succeeded in 2.001080302s: 3 [2018-10-19 07:40:31,968: INFO/ForkPoolWorker-3] Task celery_app.task2.say[28246df4-0eff-49f0-b468-f92b85fe97b3] succeeded in 2.00303293001s: 'helo' ##定時任務遇到的問題就是不能對托管的定時任務做動態更新, 需要重啟 celery beat ..

#進階用法 1.1 Celery異步函數回調 經過快速入門的學習后,我們已經能夠使用 Celery 管理普通任務,但對於實際使用場景來說這是遠遠不夠的,所以我們需要更深入的去了解 Celery 更多的使用方式。 首先來看之前的task: import time from celery import Celery, platforms platforms.C_FORCE_ROOT = True from celery.app.task import Task broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' app = Celery(__file__, broker=broker, backend=backend) @app.task def add(x, y): time.sleep(2) return x / y 這里的裝飾器app.task實際上是將一個正常的函數修飾成了一個celery task對象, 所以這里我們可以給修飾器加上參數來決定修飾后的task對象的一些屬性, 我們也可以自己復寫task類然后讓這個自定義task修飾函數 add, 來做一些自定義操作, 比如celery修飾的函數執行成功 失敗 執行完畢時的行為. import time from celery import Celery, platforms platforms.C_FORCE_ROOT = True from celery.app.task import Task broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' app = Celery(__file__, broker=broker, backend=backend) class Mytask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task success 11111' return super(Mytask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task failed' return super(Mytask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): print 'this is after return' return super(Mytask, self).after_return(status, retval, task_id, args, kwargs, einfo) def on_retry(self, exc, task_id, args, kwargs, einfo): print 'this is retry' return super(Mytask,self).on_retry(exc, task_id, args, kwargs, einfo) @app.task(base=Mytask) def add(x, y): time.sleep(2) return x / y celery -A tasks worker --loglevel=info ##啟動任務 In [2]: t = add.delay(1, 2) ##執⾏celery函數 在worker中會有類似如下輸出: [2018-10-19 15:05:18,986: WARNING/Worker-1] task success 11111 [2018-10-19 15:05:18,987: WARNING/Worker-1] this is after return [2018-10-19 15:05:18,988: INFO/MainProcess] Task task.add[c41343cf-e5ca-49f7-9478-63f4c2e2797f] succeeded in 2.01661233298s: 0 In [3]: t = add.delay(1, 0) ##執⾏celery函數, 此函數會由於函數代碼中分分母0而報錯. 在worker中會有類似如下輸出: [2018-10-19 15:13:38,260: WARNING/Worker-2] task failed [2018-10-19 15:13:38,261: WARNING/Worker-2] this is after return [2018-10-19 15:13:38,262: ERROR/MainProcess] Task task.add[12a852cd-726e-44a0-97a8-eb17308c354e] raised unexpected: ZeroDivisionError('integer division or modulo by zero',) Traceback (most recent call last): File "/app_shell/websocket/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/app_shell/websocket/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/app_shell/websocket/demo/task.py", line 38, in add return x / y ZeroDivisionError: integer division or modulo by zero 1.2 將修飾函數成為Task類的綁定方法, 執行中的任務獲取到了自己執行任務的各種信息, 可以根據這些信息做很多其他操作. import time from celery import Celery, platforms platforms.C_FORCE_ROOT = True from celery.app.task import Task broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' app = Celery(__file__, broker=broker, backend=backend) @app.task(base=Mytask, bind=True) def add(self, x, y): print self.request time.sleep(2) return x / y celery -A tasks worker --loglevel=info ##啟動任務 In [2]: t = add.delay(1, 2) 在worker中會有類似如下輸出: 2019/7/14 2.Celery進階使用 127.0.0.1:8888/notebooks/Celery/2.Celery進階使用.ipynb 2/2 In [ ]: [2018-10-19 15:05:16,972: WARNING/Worker-1] <Context: {'chord': None, 'retries': 0, 'args': (1, 2), u'is_eager': False, u'correlation_id': u'c41343cf-e5ca-49f7-9478- 63f4c2e2797f', 'errbacks': None, 'taskset': None, 'id': 'c41343cf-e5ca-49f7-9478-63f4c2e2797f', u'headers': {}, 'called_directly': False, 'utc': True, 'task': 'task.add', u'group': None, 'callbacks': None, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'}, u'hostname': 'celery@gitlab.example.com', 'expires': None, 'timelimit': (None, None), 'eta': None, 'kwargs': {}, u'reply_to': u'46af07e6-8e03-3574-9608- fc8c0b10a98e', '_protected': 1}> [2018-10-19 15:05:18,986: WARNING/Worker-1] task success 11111 [2018-10-19 15:05:18,987: WARNING/Worker-1] this is after return
四、django-celery實現異步HTTP請求
1、安裝django-celery 1.1 安裝django-celery之前需先安裝Celery, 安裝的包的版本不是越新越好, 建議安裝Celery的版本為4.0.0以下, 否則可能存在兼容性問題. 如果pip官方源不能安裝, 可以使用aliyun的pip源 pip install celery==4.0.0 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com pip install django-celery==3.2.2 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com pip install redis==2.10.6 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com 2、在django項目配置文件settings.py中配置celery. 2.1 加入以下參數: import djcelery #引入django-celery包 djcelery.setup_loader()#當項目運行時, celery會去查看installd_apps下包含的所有app目錄中的task.py⽂件, 並找到標記為task的方法, 將他們注冊成為Celery task. from celery import Celery, platforms BROKER_URL = 'redis://127.0.0.1:6379'#指定消息中間件所在位置 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'#指定任務執行結果的存儲位置 platforms.C_FORCE_ROOT = True #使celery可以在root下啟動 CELERY = Celery(__file__, broker=BROKER_URL, backend=CELERY_RESULT_BACKEND) 2.2 修改INSTALLED_APP在其中加入djcelery INSTALLED_APPS = [ ... ... 'djcelery', ] 3、在APP目錄下創建task.py import os import sys from django.conf import settings import time app = settings.CELERY @app.task def f1(): time.sleep(10) print("heelo!") return "hello return!" 4、 views.py中添加要異步執行的任務代碼. from __future__ import unicode_literals from index.task import f1 def celery_index(request): r = f1.delay() print(r) return HttpResponse("this is index page!") 5、啟動項目 (proj_b) [root@nfs webadmins]# python manage.py runserver 0.0.0.0:8901 #啟動Django項⽬ (proj_b) [root@nfs webadmins]# python manage.py celery worker -c 4 --loglevel=info #啟動Celery 6、驗證 6.1 請求URL http://172.16.70.233:8901/celery_index/ 6.2 驗證celery異步任務執行
Celery 框架學習筆記
在學習Celery之前,我先簡單的去了解了一下什么是生產者消費者模式。
生產者消費者模式
在實際的軟件開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。
單單抽象出生產者和消費者,還夠不上是生產者消費者模式。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據,如下圖所示:
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過消息隊列(緩沖區)來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給消息隊列,消費者不找生產者要數據,而是直接從消息隊列里取,消息隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。這個消息隊列就是用來給生產者和消費者解耦的。------------->這里又有一個問題,什么叫做解耦?
解耦:假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
因為太抽象,看過網上的說明之后,通過我的理解,我舉了個例子:吃包子。
假如你非常喜歡吃包子(吃起來根本停不下來),今天,你媽媽(生產者)在蒸包子,廚房有張桌子(緩沖區),你媽媽將蒸熟的包子盛在盤子(消息)里,然后放到桌子上,你正在看巴西奧運會,看到蒸熟的包子放在廚房桌子上的盤子里,你就把盤子取走,一邊吃包子一邊看奧運。在這個過程中,你和你媽媽使用同一個桌子放置盤子和取走盤子,這里桌子就是一個共享對象。生產者添加食物,消費者取走食物。桌子的好處是,你媽媽不用直接把盤子給你,只是負責把包子裝在盤子里放到桌子上,如果桌子滿了,就不再放了,等待。而且生產者還有其他事情要做,消費者吃包子比較慢,生產者不能一直等消費者吃完包子把盤子放回去再去生產,因為吃包子的人有很多,如果這期間你好朋友來了,和你一起吃包子,生產者不用關注是哪個消費者去桌子上拿盤子,而消費者只去關注桌子上有沒有放盤子,如果有,就端過來吃盤子中的包子,沒有的話就等待。對應關系如下圖:
考察了一下,原來當初設計這個模式,主要就是用來處理並發問題的,而Celery就是一個用python寫的並行分布式框架。
然后我接着去學習Celery
Celery的定義
Celery(芹菜)是一個簡單、靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。
我比較喜歡的一點是:Celery支持使用任務隊列的方式在分布的機器、進程、線程上執行任務調度。然后我接着去理解什么是任務隊列。
任務隊列
任務隊列是一種在線程或機器間分發任務的機制。
消息隊列
消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。如下圖所示:
Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴展能力。
Celery的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這里我先去了解RabbitMQ,Redis。
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
然后我接着去安裝Celery,在安裝Celery之前,我已經在自己虛擬機上安裝好了Python,版本是2.7,是為了更好的支持Celery的3.0以上的版本。
因為涉及到消息中間件,所以我先去選擇一個在我工作中要用到的消息中間件(在Celery幫助文檔中稱呼為中間人<broker>),為了更好的去理解文檔中的例子,我安裝了兩個中間件,一個是RabbitMQ,一個redis。
在這里我就先根據Celery3.1的幫助文檔安裝和設置RabbitMQ, 要使用 Celery,我們需要創建一個 RabbitMQ 用戶、一個虛擬主機,並且允許這個用戶訪問這個虛擬主機。下面是我在個人虛擬機Ubuntu14.04上的設置:
$ sudo rabbitmqctl add_user forward password
#創建了一個RabbitMQ用戶,用戶名為forward,密碼是password
$ sudo rabbitmqctl add_vhost ubuntu
#創建了一個虛擬主機,主機名為ubuntu
$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"
#允許用戶forward訪問虛擬主機ubuntu,因為RabbitMQ通過主機名來與節點通信
$ sudo rabbitmq-server
之后我啟用RabbitMQ服務器,結果如下,成功運行:

之后我安裝Redis,它的安裝比較簡單,如下:
$ sudo pip install redis
然后進行簡單的配置,只需要設置 Redis 數據庫的位置:
BROKER_URL = 'redis://localhost:6379/0'
URL的格式為:
redis://:password@hostname:port/db_number
URL Scheme 后的所有字段都是可選的,並且默認為 localhost 的 6379 端口,使用數據庫 0。我的配置是:
redis://:password@ubuntu:6379/5
之后安裝Celery,我是用標准的Python工具pip安裝的,如下:
$ sudo pip install celery
為了測試Celery能否工作,我運行了一個最簡單的任務,編寫tasks.py,如下圖所示:
編輯保存退出后,我在當前目錄下運行如下命令:
$ celery -A tasks worker --loglevel=info
#查詢文檔,了解到該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py,后面的tasks就是APP的名稱,worker是一個執行任務角色,后面的loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執行程序中add這個加法任務(task)。
然后看到界面顯示結果如下:
我們可以看到Celery正常工作在名稱ubuntu的虛擬主機上,版本為3.1.23,在下面的[config]中我們可以看到當前APP的名稱tasks,運輸工具transport就是我們在程序中設置的中間人redis://127.0.0.1:6379/5,result我們沒有設置,暫時顯示為disabled,然后我們也可以看到worker缺省使用perfork來執行並發,當前並發數顯示為1,然后可以看到下面的[queues]就是我們說的隊列,當前默認的隊列是celery,然后我們看到下面的[tasks]中有一個任務tasks.add.
了解了這些之后,根據文檔我重新打開一個terminal,然后執行Python,進入Python交互界面,用delay()方法調用任務,執行如下操作:
這個任務已經由之前啟動的Worker異步執行了,然后我打開之前啟動的worker的控制台,對輸出進行查看驗證,結果如下:
綠色部分第一行說明worker收到了一個任務:tasks.add,這里我們和之前發送任務返回的AsyncResult對比我們發現,每個task都有一個唯一的ID,第二行說明了這個任務執行succeed,執行結果為12。
查看資料說調用任務后會返回一個AsyncResult實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(如果任務失敗,則為異常和回溯)。但這個功能默認是不開啟的,需要設置一個 Celery 的結果后端(backend),這塊我在下一個例子中進行了學習。
通過這個例子后我對Celery有了初步的了解,然后我在這個例子的基礎上去進一步的學習。
因為Celery是用Python編寫的,所以為了讓代碼結構化一些,就像一個應用,我使用python包,創建了一個celery服務,命名為pj。文件目錄如下:
celery.py
from __future __ import absolute_import
#定義未來文件的絕對進口,而且絕對進口必須在每個模塊的頂部啟用。
from celery import Celery
#從celery導入Celery的應用程序接口
App.config_from_object(‘pj.config’)
#從config.py中導入配置文件
if __name__ == ‘__main__’:
app.start()
#執行當前文件,運行celery
app = Celery(‘pj’,
broker=‘redis://localhost’,
backend=‘redis://localhost’,
include=[‘pj.tasks’]
)
#首先創建了一個celery實例app,實例化的過程中,制定了任務名pj(與當前文件的名字相同),Celery的第一個參數是當前模塊的名稱,在這個例子中就是pj,后面的參數可以在這里直接指定,也可以寫在配置文件中,我們可以調用config_from_object()來讓Celery實例加載配置模塊,我的例子中的配置文件起名為config.py,配置文件如下:
在配置文件中我們可以對任務的執行等進行管理,比如說我們可能有很多的任務,但是我希望有些優先級比較高的任務先被執行,而不希望先進先出的等待。那么需要引入一個隊列的問題. 也就是說在我的broker的消息存儲里面有一些隊列,他們並行運行,但是worker只從對應 的隊列里面取任務。在這里我們希望tasks.py中的add先被執行。task中我設置了兩個任務:
所以我通過from celery import group引入group,用來創建並行執行的一組任務。然后這塊現需要理解的就是這個@app.task,@符號在python中用作函數修飾符,到這塊我又回頭去看python的裝飾器(在代碼運行期間動態增加功能的方式)到底是如何實現的,在這里的作用就是通過task()裝飾器在可調用的對象(app)上創建一個任務。
了解完裝飾器后,我回過頭去整理配置的問題,前面提到任務的優先級問題,在這個例子中如果我們想讓add這個加法任務優先於subtract減法任務被執行,我們可以將兩個任務放到不同的隊列中,由我們決定先執行哪個任務,我們可以在配置文件中這樣配置:
先了解了幾個常用的參數的含義:
Exchange:交換機,決定了消息路由規則;
Queue:消息隊列;
Channel:進行消息讀寫的通道;
Bind:綁定了Queue和Exchange,意即為符合什么樣路由規則的消息,將會放置入哪一個消息隊列;
我將add這個函數任務放在了一個叫做for_add的隊列里面,將subtract這個函數任務放在了一個叫做for_subtract的隊列里面,然后我在當前應用目錄下執行命令:
這個worker就只負責處理for_add這個隊列的任務,執行這個任務:
任務已經被執行,我在worker控制台查看結果:
可以看到worker收到任務,並且執行了任務。
在這里我們還是在交互模式下手動去執行,我們想要crontab的定時生成和執行,我們可以用celery的beat去周期的生成任務和執行任務,在這個例子中我希望每10秒鍾產生一個任務,然后去執行這個任務,我可以這樣配置:
使用了scheduler,要制定時區:CELERY_TIMEZONE = 'Asia/Shanghai',啟動celery加上-B的參數:
並且要在config.py中加入from datetime import timedelta。
更近一步,如果我希望在每周四的19點30分生成任務,分發任務,讓worker取走執行,可以這樣配置:
看完這些基礎的東西,我回過頭對celery在回顧了一下,用圖把它的框架大致畫出來,如下圖: