flask中使用celery


 

參考資料: 

Celery 官網:http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/

Celery簡介
除Celery是一個異步任務的調度工具。 Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等着被分配工作的碼農。
Broker
在 Python 中定義 Celery 的時候,我們要引入 Broker(消息中間件),中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等着取出一個個任務准備着手做。
Backend
這種模式注定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
Celery應用場景
1.你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
2.你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福

Celery的特點
1.簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
2.高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
3.快速:一個單進程的celery每分鍾可處理上百萬個任務
3.靈活: 幾乎celery的各個組件都可以被擴展及自定制

Celery工作基本流程

 

我們的項目

項目目錄:

proj/celery.py

復制代碼
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
            broker = 'amqp://',
            backend = 'amqp://',
            include = ['proj.tasks'])

app.conf.update(
    result_expires = 3600
)

if __name__ == '__main__':
    app.start()
復制代碼

在這個模塊中創建了Celery實例(通常稱為app

要在項目中使用Celery只需要通過import導入該實例就行了

  • broker參數指定要使用的中間件的URL
  • backend參數指定使用的result backend

    用來跟蹤任務狀態和結果,雖然默認狀態下結果不可用。以上例子中使用RPC result backend。當然,不同的result backend都有自己的好處和壞處,根據自己實際情況進行選擇,如果不需要最好禁用。通過設置@task(ignore_result=True)選項來禁用耽擱任務)

  • include參數是當worker啟動時導入的模塊列表需要在這里添加自己的任務莫誇這樣worker就可以找到任務

proj/tasks.py
  
復制代碼
from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y
    
@app.task
def xsum(numbers):
    return sum(numbers)
復制代碼

 

啟動worker

Celery程序可以用來啟動worker

celery -A proj worker -l info 
 -------------- celery@centos6 v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-2.6.32-696.el6.x86_64-x86_64-with-centos-6.9-Final 2018-03-26 12:27:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: task:0x7fe5cfbd20d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] [2018-03-26 12:27:49,921: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2018-03-26 12:27:49,926: INFO/MainProcess] mingle: searching for neighbors [2018-03-26 12:27:49,499: INFO/MainProcess] mingle: sync with 1 nodes [2018-03-26 12:27:50,950: INFO/MainProcess] mingle: sync complete [2018-03-26 12:27:50,957: INFO/MainProcess] celery@centos6 ready. 
  • broker是在celery模塊中指定的中間件參數的url,也可以在命令行中通過-b選項指定不同的中間件
  • Concurrent是用於並行處理的任務的預創建worker進程數量,當所有的任務都在忙於工作時,新的任務必須等待之前的執行完成才能處理

默認的並發數是機器上CPU的數量,可以通過celery worker -c選項指定自定義數量。沒有推薦值,最佳數量取決於很多因素,但是如果你的任務主要是I/O相關的,就可以增加這個數量。實驗表明,增加超過兩倍CPU數量效果很差,而且可能會降低性能

除了prefork poolCelery還支持EventletGevent並且還能在單線程上運行

  • Event是一個可選項,當啟用的時候,Celery會發送監控(消息)來反映worker的操作,也可以被用來監視像celeryeventsFlower(實時Celery監控)這樣的程序。
  • Queuesworker將使用的任務的隊列的集合,worker可以一次接受幾個隊列,它用來將消息路由到特定的工作者以作為服務質量、關注點分離、和優化的一種方式

可以通過命令行獲取完整的列表————celery worker --help

停止worker

ctrl-c

后台

生產環境中一般將worker放到后台,后台腳本使用celery multi命令后台啟動一個或多個worker

celery multi start w1 -A proj -l info 

控制台打印

celery multi v4.1.0 (latentcall) > Starting nodes... > w1@centos6: OK Stale pidfile exists - Removing it. 

也可以重啟:

celery multi restart w1 -A proj -l info 
celery multi v4.1.0 (latentcall) > Stopping nodes... > w1@centos6: TERM -> 23620 > Waiting for 1 node -> 23620..... > w1@centos6: OK > Restarting node w1@centos6: OK > Waiting for 1 node -> None... 

停止:

celery multi stop w1 -A proj -l info 

stop命令是異步的所以它不會等待worker關閉,可以使用stopwait命令來確保當前執行都任務在退出前都已執行完畢

celery multi stopwait w1 -A proj -l info 

celery multi不會存儲關於worker的信息,所以重啟的時候需要使用同樣的命令行參數。在停止時,必須使用相同的pidfilelogfile參數

默認情況下,程序將在當期目錄創建pidlog文件,為了防止多個worker運行出錯,推薦將這些文件放在專門的目錄:

mkdir -p /var/run/celery mkdir -p /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log 

使用multi指令可以啟動多個worker,並且有一個強大的命令行語法來為不同的worker指定參數:

celery multi start 10 -A proj -l info -Q:1-3 images, video -Q:4, 5 data -Q default -L:4,5 debug 

~Detail about multi temp

--app參數

--app參數指定使用的Celery應用實例,必須以module.path:attribute的形式出現

但也支持快捷方式,只要包名指定了,就會嘗試在應用實例中搜索

使用--app=proj

  1. 名為proj.app的屬性
  2. 名為proj.app的屬性
  3. 模塊proj中的任何屬性都是一個Celery應用程序,如果都沒有發現,它就會嘗試一個名為proj.celery的子模塊
  4. 名為proj.celery.app的屬性
  5. 名為proj.celery.celery的屬性
  6. 模塊proj.celery中的任何屬性都是一個Celery應用程序

任務調用

  • 可以通過使用delay()方法來調用一個任務
add.delay(3, 3) 

這個方法實際上是另一種叫做apply_async()方法的快捷方式

add.applay_async((3, 3)) 

后者(applay_async())能夠指定執行選項,比如運行時間(倒計時)、應該發送的隊列等等:

add.apply_async((2, 2), queue='lopri', countdown=10) 

上述案例中,任務會被發送給一個名為lopri的隊列,該任務會在信息發送后十秒執行

直接應用該任務會在當前進程中執行任務,不會發送消息

add(3, 3) 

result:6

三種方法delay()apply_async()和應用__call__,代表了Celery調用API,也同樣用於簽名

  • 每一個任務調用都有一個唯一的標識符(UUID),這個就是任務的id

  • delay()apply_async方法會返回一個AsyncResult實例,可以被用來跟蹤任務執行狀態,但是需要開啟result backend這樣狀態才能被存儲在某處

  • Results默認是禁用的,因為實際上沒有一個result backend適用於每個應用程序,所以要考慮到每個獨立backend的缺點來選擇一個使用。對於許多保持返回值的任務來說都不是很有用,所以這個默認的禁用是很明智的。還需要注意的是,result backend並不用來監控任務和worker,對於Celery有專門的事件消息

如果配置了result backend就可以接收到任務的返回值

result = add.delay(2, 2) res.get(timeout=1) 

retult:4

  • 可以通過查看id屬性找到任務的id
res.id 

result:073c568d-ca88-4198-b735-0f98f861218b

  • 如果任務拋出異常也可以檢查到異常,默認result.get()可以傳播任何錯誤

  • 如果不希望錯誤傳播,可以通過propagete屬性禁用

res.get(propagate=False) 

在這種情況下,它會返回所提出的異常實例,以便檢查任務是否成功或失敗,您將不得不在結果實例上使用相應的方法

res.failed() res.successful() 

也可以通過state找到任務的狀態

res.state 

result:FAILUTE

  • 一個任務只能有一個狀態,但是可以在幾個狀態中發展,典型任務階段可能是這樣
PENDING -> STARTED -> SUCCESS

STARTED狀態是一個特殊的狀態,只有在task_track_started設置啟用或者@task(track_started=True)選項設置的時候才會被記錄下來

PENDING狀態實際上不是記錄狀態,而是未知任務id的默認狀態

from proj.celery import app res = app.AsyncResult('this-id-does-not-exist') res.state 

result:PENDING

  • 如果重新嘗試這個任務可能會變得更復雜,對於一個嘗試過兩遍的任務來說階段可能是這樣:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Canvas:設計任務流

前面學習了通過delay方法調用任務,通常這樣就夠了,但是有時可能需要將任務調用的簽名傳遞給另一個進程或者另一個函數的參數,對Celery來說叫做signatures

簽名以某種方式包裝了單一任務調用的參數和執行選項,以便將其傳遞給函數,甚至序列化后發送。

可以使用參數(2, 2)和十秒的計時器來為add任務創建一個簽名

add.signature((2, 2), countdown=10) 

也可以簡寫:

add.s(2, 2) 

調用API

簽名的實例也支持調用API,意味着也可以有delayapply_async方法

但是有一個區別,那就是簽名可能已經指定了一個參數簽名,add任務接受兩個參數,所以一個制定了兩個參數的簽名將會形成一個完整的簽名

s1 = add.s(2, 2) res = s1.delay() res.get() 

也可以使用不完成的簽名,叫做partials

s1 = add.s(2) 

s2現在是部分簽名,需要另一個參數才完整,則可以在調用signature的時候處理

# resolves the partial: add(8, 2) res = s2.delay(8) res.get() 

在這里,添加了參數8,對已存在的參數2組成了一個完整的簽名add(8, 2)

關鍵字參數也可以延遲添加,會和已存在的關鍵字參數合並,新參數優先(新參數覆蓋舊參數)

s3 = add.s(2, 2, debug=True) s3.delay(debug=False) 

已聲明的簽名支持調用API:

  • sig.apply_async(arg=(), kwargs={}, **options
    使用可選部分參數和部分關鍵字參數調用簽名,也支持部分可執行選項
  • sig.delay(*args, **kwargs)
    apply_async的星參版本,任何參數都會被預先記錄在簽名的參數你,關鍵字參數會和現有的keys合並

基本體

  • group
  • chain
  • chord
  • map
  • starmap
  • chunks

這些基本體本身就是簽名對象,因此,它們可以以任何多種方式組合起來組成復雜的工作流

Group

一個group同時調用任務列表,返回一個特殊結果實例,這樣可以以組的形式檢查結果,並按順序檢索返回值

from celery import group from proj.tasks import add group(add.s(i, i) for in in range(10))().get() 

result:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

  • Partial group
g = group(add.s(i, i) for i in range(10)) g(10).get() 

result:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

任務可以被相互連接起來,這樣在一個任務返回后另一個任務被調用

from celery import chain form proj.tasks import add, mul // 用法1 chian(add.s(4, 4) | mul.s(8))().get() // 用法2 g = chain(add.s(4) | mul.s(8)) g(4).get() // 用法3 (add.s(4, 4) | mul.s(8))().get() 

Chords

chord是一個有返回值的group

from celery import chord from proj.tasks import add, xsum // 用法1 (group(add.s(i, i) for i in range(10)) | xsum.s())().get() // 用法2 upload_document.s(file) | group(apply_filter.s() for filter in filters) 

路由

Celery支持AMQP提供的所有路由設施,但是它也支持簡單路由,將消息發送到指定的隊列

task_routes設置可以是用戶按名稱對任務進行路由,並將一切集中在一個位置

app.conf.update{
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'}, } } 

可以在運行時通過queue參數指定隊列到apply_async

from proj.tasks import add add.apply_async((2,2), queue='hipri') 

然后可以通過指定celery worker -Q選項使worker從隊列中消費

celery -A proj worker -Q hipri 

也可以通過使用逗號分隔符(,)來指定多個隊列

celery -A proj worker -Q hipri, celery 

默認隊列因為歷史原因命名為:celery

隊列的順序無關緊要,因為worker會給隊列相同的權重

遠程控制

如果使用RabbitMQ(AMQP)Redis或者Qpid作為中間件就可以在運行時監視worker

  • 查看worker當前執行的任務
celery -A proj inspect active 

這是通過使用廣播消息實現的,因此,急群眾的每一個工作人員都能接收到所有遠程控制命令

  • 也可以指令一個或多個worker使用--destination選項請求行動,這是一個逗號分隔的worker主機名列表
celery -A proj inspect active --destination=celery@example.com 

如果沒有提供目標,那么每個worker都會對請求做出反應並回復

  • celery inspect命令包含的命令不會改變worker的任何東西,它只會回復關於worker內部發生的事情的信息和統計信息,可以執行命令檢查列表:
celery -A proj inspect --help 
  • celery control命令,包含在運行時實際改變worker操作的命令
celery -A proj control --help 
  • 強制worker啟用事件消息(用於監視任務和工作人員)
celery -A proj control enable_events 

當事件激活,可以啟動event dumper查看worker正在做什么

celery -A proj events --dump 

或者

celery -A proj events 

當完成監控可以再次禁用events

celery -A proj control disable_events 

celery status命令還能使用遠程控制命令,並顯示集群中的在線worker列表

celery -A proj status 

時區

所有的時間和日期、內部和消息多使用UTC時間區域

worker收到消息,例如使用倒計時設置,它將UTC時間轉換為本地時間。如果希望使用與系統時區不同的地區,那么必須要使用時區設置來配置該時區:

app.conf.timezone = 'Asia/Shanghai'
 

最優化

默認的配置並沒有針對吞吐量進行優化,它試圖在許多短任務和更少的長任務之間走中間路線,這是吞吐量和公平調度之間的折中

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM