celery 停止執行中 task


原因

因為最近項目需求中需要提供對異步執行任務終止的功能,所以在尋找停止celery task任務的方法。這種需求以前沒有碰到過,所以,只能求助於百度和google,但是找遍了資料,都沒找到相關的能停止celery task任務的方法(網上找到的一個方法實測不能用,可能是celery版本的原因,我的項目目前使用的是celery 4.0.2)

解決過程

由於網上找不到解決辦法,於是只能自己想辦法了。
想到celery 管理工具flower里面好像有停止celery task的功能,於是去找flower的源碼,找到接口的源碼如下:

logger.info("Revoking task '%s'", taskid)
terminate = self.get_argument('terminate', default=False, type=bool)
self.capp.control.revoke(taskid, terminate=terminate)
self.write(dict(message="Revoked '%s'" % taskid))

核心代碼是self.capp.control.revoke 想到去celery里面找尋revoke函數,發現有兩處比較可疑,第一個是celery.worker.control.revoke,第二個是celery.app.control.Control.revoke,直覺來看,應該是第二個方法,但是第二個方法是在一個類里面的,要調用這個方法首先需要獲取到celery app的實例,后來去celery 配置里面找,發現在__init__.py文件里面有__all__ = ['celery_app']這么一句,於是找到突破點了,引用這個包就能獲取到celery_app了。

from test.ceyery_proj import celery_app
celery_app.control.revoke(task_id, terminate=True)

通過這個方法就能終止正在執行的task,至於task_id在執行任務的時候返回了,我將這個id存儲在數據庫中,這樣就可以被拿來控制task的執行了。

寫這篇文檔的目的主要是幫助小伙伴們不要再踩這個坑了,也為celery提供一點文檔補充吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM