原因
因為最近項目需求中需要提供對異步執行任務終止的功能,所以在尋找停止celery task任務的方法。這種需求以前沒有碰到過,所以,只能求助於百度和google,但是找遍了資料,都沒找到相關的能停止celery task任務的方法(網上找到的一個方法實測不能用,可能是celery版本的原因,我的項目目前使用的是celery 4.0.2)
解決過程
由於網上找不到解決辦法,於是只能自己想辦法了。
想到celery 管理工具flower里面好像有停止celery task的功能,於是去找flower的源碼,找到接口的源碼如下:
logger.info("Revoking task '%s'", taskid)
terminate = self.get_argument('terminate', default=False, type=bool)
self.capp.control.revoke(taskid, terminate=terminate)
self.write(dict(message="Revoked '%s'" % taskid))
核心代碼是self.capp.control.revoke
想到去celery里面找尋revoke
函數,發現有兩處比較可疑,第一個是celery.worker.control.revoke
,第二個是celery.app.control.Control.revoke
,直覺來看,應該是第二個方法,但是第二個方法是在一個類里面的,要調用這個方法首先需要獲取到celery app的實例,后來去celery 配置里面找,發現在__init__.py文件里面有__all__ = ['celery_app']
這么一句,於是找到突破點了,引用這個包就能獲取到celery_app了。
from test.ceyery_proj import celery_app
celery_app.control.revoke(task_id, terminate=True)
通過這個方法就能終止正在執行的task,至於task_id在執行任務的時候返回了,我將這個id存儲在數據庫中,這樣就可以被拿來控制task的執行了。
寫這篇文檔的目的主要是幫助小伙伴們不要再踩這個坑了,也為celery提供一點文檔補充吧。