基於celery的任務管理


 1、celery基本原理

      Celery是一個由python編寫的簡單、靈活、可靠的用來處理大量信息的分布式系統,同時提供了操作和維護分布式系統所需要的工具,說白了就是一個用來管理分布式隊列的工具。

      Celery主要包含如下幾個概念:

Brokers存放/拿取任務的地方,常見的brokersrabbitmqredisZookeeper等;

Backend結果儲存的地方,隊列中的任務運行完后的結果或者狀態需要被任務發送者知道,那么就需要一個地方儲存這些結果,常見的backendredisMemcached

Workers從隊列中取出任務並執行;

Tasks任務,一般由用戶、觸發器或其他操作將任務入隊,然后交由workers進行處理。

基本框架圖見下:

  我們通常使用celery的命令來啟動celery,啟動的時候我們會指定一個celery實例,使用這個實例的task包裝了的任務函數被包裝成了celerytask對象,這樣他們就有了delay方法。當我們執行delay方法時,這些task會找自己所屬的那個celery instance,從中獲取配置信息(主要是broker的地址)后將調用請求發往消息隊列。

2、celery基本使用方式

快捷的任務啟動函數delay

先看最基本使用方式,創建一個celery實例,指定redis0號庫和1號庫為其brokerbackend,創建一個最基本的用實例的task裝飾的函數add

      再在另一個文件創建一個一個使用delay啟動的異步任務,啟動后,delay會向worker發消息申請執行任務,workerbroker獲取已注冊的task並根據入參執行,然后我們再根據ready函數從backend獲取任務執行狀態,執行完成后我們再從backend獲取返回結果。

 

強大的任務啟動函數apply_async

      apply_async(args[,kwargs[, ...]])celery真正的啟動函數,delay是它的快捷版,apply_async可以指定更多的參數,第一部分就是task里面的python function的參數,比如add(x,y)x,y,第二個參數叫作keyword arguments,就是設定一些環境變量,第三個參數就是execution options,也就是這個task本身的執行選項,下面介紹幾個常用參數:

countdown:指定多少秒后執行任務;

 

eta (estimated time of arrival):指定任務被調度的具體時間,參數類型是datetime

可以看到,時間貌似不對,其實是因為celery服務端運行使用的UTC時區,如果不指定utcnow就等不到執行時間了。

expires任務過期時間,已開始的任務會被執行完成,未開始的不再執行,參數類型可以是float,也可以是datetime

link(Signature)上一個任務返回成功才執行這個任務,相當於與一個回調,回調函數可以引用任務結果。

 

3、任務的排列組合

 group順序的執行一組任務

 

chain任務是可以被鏈接的,一個任務完成后,把結果傳給另外一個任務,作為下一個任務的入參.

chords: 可以傳入一個回調函數,等任務執行完,參數傳給回調函數處理。

效果等同於groupchain的組合:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM