celery詳解


Celery詳解

1、背景

由於從事區塊鏈錢包相關開發,對於區塊鏈鏈上資源需要頻繁的進行檢查同步,在flask項目中,對於celery這個異步任務執行工具,使用的頻率算是相當的高,今天,我就來簡單總結一下celery的概念和使用方法。

2、形象比喻

Celery是一個異步任務的調度工具,是Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個worker的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等着被分配工作的碼農。

在python中定義Celery的時候,我們要引入Broker,中文翻譯過來就是"中間人"的意思,在這里Broker起到一個中間人的角色,在工頭提出任務的時候,把所有的任務放到Broker里面,在Broker的另一頭,一群碼農等着取出一個個任務准備着手做。

這種模式注定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的,所以我們要引入Backend來保存每次任務的結果。這個Backend有點像我們的Broker,也是存儲信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。

3、celery具體介紹

3.1 Broker

broker是一個消息傳輸的中間件,它是用來存儲生產出來的各種待執行任務的。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,而后celery的worker將會取到消息,進行程序執行,broker可以看成是一個消息隊列,其中broker的中文意思是經紀人,用來發送和接受信息。這個broker有幾個方案可供選擇:RabbitMQ(消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等。

3.2 Backend

通常程序發送的消息,發完就完了,可能都不知道對方什么時候接受了,為此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果,Backend是在Celery的配置中的一個配置項CELERY_RESULT_BACKEND,作用是保存結果和狀態,如果你需要跟蹤任務的狀態,那么需要設置這一項,可以是Database backend,也可以是Cache backend。

對於brokers,官方推薦是rabbitmq和redis,至於backend,就是數據庫,為了簡單可以都使用redis。在我的項目中,都是使用redis。

4、使用

4.1 celery架構

Celery的架構由消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)三部分組成。

  • 消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括RabbitMQ,Redis,MongoDB等

  • 任務執行單元

Worker是celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。

  • 任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP,redis,memcached,mongodb,SQLAlchemy,Django等

4.2 安裝redis+celery

安裝Redis,它的安裝比較簡單:

~$ pip install redis

然后進行配置,一般都在項目的config.py文件里配置:

CELERY_BROKER_URL = "redis://localhost:6379/0"

URL的格式為:redis://:password@hostname:port/db_number

URL Scheme后的所有字段都是可選的,並且默認為localhost的6379端口,也就是redis的默認端口,使用數據庫0。

安裝Celery:

~$ pip install celery

4.3 使用Celery

使用celery包含三個方面:1,定義任務函數 2,運行celery服務 3,客戶應用程序的調用

創建一個文件tasks.py輸入下列代碼:

from celery import Celery

broker = "redis://localhost:6379/0"
backend = "redis://localhost:6379/1"

app = Celery("tasks", broker=broker, backend=backend)

@app.task
def add(x, y)
    return x + y

上述代碼導入了celery,然后創建了celery實例app,實例化的過程中指定了任務名tasks(和文件名一致),傳入了broker和backend。然后創建了一個任務函數add。下面啟動

celery服務,在當前命令行終端運行:

~$ celery -A tasks worker

目錄結構(celery -A tasks worker --loglevel=info這條命令當前工作目錄必須和tasks.py所在的目錄相同,即進入tasks.py所在目錄執行這條命令)

調用delay函數即可啟動add這個任務,這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數,函數的參數以及其他消息,具體的可以看Celery官方文檔。這個時候worker會等待broker中的消息,一旦收到消息就會立刻執行消息。

注意:如果把返回值賦值給一個變量,那么原來的應用程序也會被阻塞,需要等待異步任務返回的結果,因此,實際使用中,不需要把結果賦值。

使用配置文件

Celery的配置比較多,可以在官方配置文檔:http://docs.celeryproject.org/en/latest/userguide/configuration.html 查詢每個配置項的含義。

4.4 健壯性

上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先創建一個python包,celery服務,姑且命名為proj。目錄文件如下:

|- proj
|-- __init__.py
|-- celery.py      # 創建celery實例
|-- config.py      # 配置文件
|-- tasks.py       # 任務函數

首先是 celery.py

from __future__ import absolute_import
from celery import Celery

app = Celery("proj", include=["proj.tasks"])

app.config_from_object("proj.config")

if __name__ == "__main__":
    app.start()

這一次創建app,並沒有直接指定broker和backend。而是在配置文件中。

然后是 config.py

from __future__ import absolute_import

BROKER_URL = "redis://localhost:6379/0"
CELERY_BACKEND_URL = "redis://localhost:6379/1"

最后是 tasks.py

from __future__ import absolute_import
from proj.celery import app

@app.task
def add(x, y):
    return x + y

使用方法也很簡單,在proj的同一級目錄執行celery:

celery -A proj worker -l info

現在使用任務也很簡單,直接在客戶端代碼調用proj.tasks里的函數即可。

4.5 定時任務

Scheduler(定時任務,周期性任務)

一種常見的需求是每隔一段時間執行一個任務

在celery中執行定時任務非常簡單,只需要設置celery對象的CELERYBEAT_SCHEDULE屬性即可。

配置如下 config.py

from __future__ import absolute_import

BROKER_URL = "redis://localhost:6379/0"
CELERY_BACKEND_URL = "redis://localhost:6379/1"

CELERY_TIMEZONE = "Asia/Shanghai"

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds':{
        'task':'proj.tasks.add',
        'schedule':timedelta(seconds=30),
        'args':(16, 16)
    },
}

注意配置文件需要指定時區,這段代碼表示每隔30秒執行add函數,一旦使用了scheduler,啟動celery需要加上-B參數。

celery -A proj worker -B -l info

對於celery的介紹就到這里了,歡迎交流技術難點。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM