如果想在任務成功或者失敗額外做點事,可以重寫Task類。
tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app from celery.task import Task class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print('task done==================>: {0}'.format(retval)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print('task fail, reason=================>: {0}'.format(exc)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask) def add(x, y): return x + y
@app.task(bind=True)
意思是綁定任務為實例方法,執行中的任務能獲取到了自己執行任務的各種信息,可以根據這些信息做很多其他操作,例如判斷鏈式任務是否到結尾等等。
示例:
# tasks.py from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def add(self, x, y): logger.info(self.request.__dict__) return x + y
案例模擬進度條,查看任務進度
------------------目錄結構-------------------
--start.py
--proj
--celery.py
--tasks.py
--------------------------------------------------
from __future__ import absolute_import, unicode_literals #導入安裝的celery(所以上面要寫絕對導入),而不是自己導自己(from .celery) from celery import Celery app = Celery('proj', broker='redis://', backend='redis://', include=['proj.tasks',]) # 這個celery管理了哪些task文件可以有多個,其中有個定時任務periodic_task # Optional configuration, see the application user guide. # update方法更新配置,也可以直接寫在上面初始化Celery里面 app.conf.update( result_expires=3600, # 任務結果一小時內沒人取就丟棄 ) if __name__ == '__main__': app.start()
from __future__ import absolute_import, unicode_literals from .celery import app #from celery import Celery import time @app.task(bind=True) def test_mes(self): for i in range(1, 11): time.sleep(1) self.update_state(state="PROGRESS", meta={'p': i*10}) return 'finish'
from proj.tasks import test_mes import sys def pm(body): res = body.get('result') if body.get('status') == 'PROGRESS': sys.stdout.write('\r任務進度: {0}%'.format(res.get('p'))) sys.stdout.flush() else: print('\r') print(res) r = test_mes.delay() r.get(on_message=pm, propagate=False) #'FINISH'
excel入庫步驟進度顯示
tasks.py
from .celery import app import time @app.task(bind=True) def excel_info_db(self): time.sleep(1) self.update_state(state="PROGRESS", meta={'step':'加載excel到內存', 'progress':'100%'}) time.sleep(1) self.update_state(state="PROGRESS", meta={'step': '在內存中校驗excel', 'progress': '100%'}) time.sleep(1) self.update_state(state="PROGRESS", meta={'step': '入庫', 'progress': '100%'}) return 'finish'
鏈式任務:
有些任務可能需由幾個子任務組成,此時調用各個子任務的方式就變的很重要,盡量不要以同步阻塞的方式調用子任務,而是用異步回調的方式進行鏈式任務的調用:
比如某個任務要依賴某幾個任務的返回值,那這幾個任務就可以同時進行,可節省時間
---------------目錄結構----------------------
--start.py
--proj
--celery.py
--tasks.py
-------------------------------------------------
tasks.py
# tasks.py from .celery import app import time @app.task() def fetch_page(url): time.sleep(3) return "抓取到的%s的內容" % url @app.task() def parse_page(page): # page是fentch_page的返回值 time.sleep(3) return "對%s做解析,得到數據" % page ''' 如果用這個@app.task(ignore_result=True)裝飾 就算全部執行完,res.get()還是會阻塞 ''' @app.task() def store_page_info(info, url): print(info + '%s數據進行入庫...' % url) time.sleep(3) return '執行結束'
start.py
# start.py from celery import group, chain from proj.tasks import * def update_page_info(url): # fetch_page -> parse_page -> store_page # 第一種寫法 # chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) # # chain() # 第二種寫法,res.get()為最后返回的結果 res = chain(fetch_page.s(url),parse_page.s(),store_page_info.s(url))() while True: if res.ready(): print(str(res.get())) break update_page_info('www.google.com')
celery.py(celery配置)
from __future__ import absolute_import, unicode_literals #導入安裝的celery(所以上面要寫絕對導入),而不是自己導自己(from .celery) from celery import Celery app = Celery('proj', broker='redis://', backend='redis://', include=['proj.tasks',]) # 這個celery管理了哪些task文件可以有多個,其中有個定時任務periodic_task # Optional configuration, see the application user guide. # update方法更新配置,也可以直接寫在上面初始化Celery里面 app.conf.update( result_expires=3600, # 任務結果一小時內沒人取就丟棄 ) if __name__ == '__main__': app.start()
執行:
1.啟動worker:`celery -A proj worker -l debug` //cd到proj目錄下才能啟動
2.向隊列放任務: `python3 start.py` //cd到proj的上一級目錄執行

鏈式任務中前一個任務的返回值默認是下一個任務的輸入值之一 ( 不想讓返回值做默認參數可以用 si() 或者 s(immutable=True) 的方式調用 )。
這里的 s() 是方法 celery.signature() 的快捷調用方式,signature 具體作用就是生成一個包含調用任務及其調用參數與其他信息的對象,個人感覺有點類似偏函數的概念:先不執行任務,而是把任務與任務參數存起來以供其他地方調用。
調用任務方式二:fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
前面講了調用任務不能直接使用普通的調用方式,而是要用類似 add.delay(2, 2) 的方式調用,而鏈式任務中又用到了 apply_async 方法進行調用,實際上 delay 只是 apply_async 的快捷方式,二者作用相同,只是 apply_async 可以進行更多的任務屬性設置,比如 callbacks/errbacks 正常回調與錯誤回調、執行超時、重試、重試時間等等,具體參數可以參考:https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async
組任務group
並行執行組內的每個任務
from celery import group
group(fetch_page.s(url),parse_page.s('test1'),store_page_info.s('test2',url))()
任務分割chord
分為header和body兩個部分,會先執行header在將header的結果傳給body執行
from celery import chord
chord(header=[fetch_page.s(url)],body=parse_page.s())()
任務分組chunks
按照任務個數分組,並不是並發執行
from celery import chunks
fetch_page.chunks(['1','2','3'],2)() #2代表每組的任務個數,需要注意的是如果第一個參數傳入的是字符串的話,那么字符串會被分割成每個字符當作參數傳入
