前言
寫這個的原因是網上關於celery+flask
操作的很多博客大多停留在delay
添加異步任務的階段,但是對於任務狀態的查看和卡頓任務的刪除進行講解的卻很少,即便有,很多也是關於django
的,對於flask
操作人員不太友好
所以在這篇博客中會以最簡的方式實現這兩個功能
環境
celery
,redis
,flask
,python3
操作系統 windows10
注冊celery實例
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
#注冊celery實例
celery = make_celery(app)
任務狀態查看
celery
的broker
由redis
擔任
使用state
來存儲任務的狀態,celery
的內置狀態有:
參數 | 說明 |
---|---|
PENDING | 任務等待中 |
STARTED | 任務已開始 |
SUCCESS | 任務執行成功 |
FAILURE | 任務執行失敗 |
RETRY | 任務將被重試 |
REVOKED | 任務取消 |
PROGRESS | 任務進行中 |
當然我們也可以自定義相關的狀態,將狀態以不同的樣式顯示出來
想要獲取到任務執行的狀態,我們需要先將celery
的任務進行綁定
任務綁定到實例獲取到任務的上下文,我們就可以在任務運行時候獲取到任務的狀態,記錄相關日志等
如何進行任務綁定,給任務添加bind=True
參數,如下
@celery.task(bind=True)
def scanIndex(self,url):
添加了bind
參數之后,函數的參數發生變化, 多出了參數 self, 這這相當於把 scanIndex
變成了一個已綁定的方法, 通過 self
可以獲得任務的上下文
我們先使用delay
創建任務
tempResult=scanIndex.delay(url)
delay
會返回給我們一個AsyncResult
實例,不管什么時候,我們都可以通過返回的實例的task_id
從redis
查找執行的結果,比如你在執行一個任務,你可以設置一個周期性輪詢,去查看這個結果是否已經被生產出來,如果生產出來便取到該值做相應的操作即可。
tid=tempResult.task_id
我們只需要及時將這個task_id
存儲起來即可,這樣方便我們后面進行任務查看
這里我使用mysql
進行任務信息的存儲
task
表結構為:
class Task(db.Model):
__tablename__ = 'task'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
tid= db.Column(db.String(128), nullable=False)
state= db.Column(db.String(128), nullable=False)
date = db.Column(db.String(30), nullable=False)
其中tid
為任務task_id
,state
為任務的狀態,初始化任務后狀態手動設置為PENDING
temptask=Task(tid=tempResult.task_id,state='PENDING',date=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
db.session.add(temptask)
db.session.commit()
接着在我們的程序邏輯中,每到一個狀態就修改任務的狀態
比如,任務開始執行之后,進入執行函數,就將任務狀態修改為PROGRESS
@celery.task(bind=True)
def scanIndex(self,url):
self.update_state(state="PROGRESS")
這里的self
是因為在任務綁定后的celery
任務中,update_state
方法用於更新狀態
當用戶在前端進行任務狀態查看的時候,就先將mysql
中的每一個tid
取出來,到redis
里面去查看任務狀態,並更新到mysql
中,再將新的任務狀態顯示在頁面上即可
前端頁面的顯示可以使用不同的顏色來表示不同的狀態
{% if task.state=='PENDING'%}
<tr class="">
{% elif task.state=='SUCCESS'%}
<tr class="success">
{% elif task.state=='FAILURE'%}
<tr class="danger">
{% elif task.state=='PROGRESS'%}
<tr class="info">
{% endif %}
</tr>
使用css
以不同的顏色以區分,更能夠快速查看出錯和失效的任務
任務刪除
任務刪除比任務狀態查看更簡單,我們開始任務后先到redis
里面看看任務執行之后是什么樣子的
127.0.0.1:6379> flushall
OK
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> keys *
1) "celery-task-meta-ac453fe5-e1c2-43b8-a923-dd4e8df9702a"
2) "_kombu.binding.celery"
127.0.0.1:6379>
而后面的ac453fe5-e1c2-43b8-a923-dd4e8df9702a
正是任務task_id
所以我們只需要進行簡單的字符串拼接,可以獲取到任務在redis
里面的全稱了,接着在python
里面使用
taskname="celery-task-meta-"+tid
r.delete(taskname)
進行刪除,r
是redis
的一個鏈接實例
mysql
里面的對應任務已經產生的數據也用類似的方法,通過tid
來進行查詢刪除即可
達到刪除celery
任務的效果
參考鏈接
- https://www.celerycn.io/yong-hu-zhi-nan/ren-wu-tasks/zhuang-tai-states
- https://www.cnblogs.com/wdliu/p/9517535.html
- https://www.pyfdtic.com/2018/03/16/python-celery-%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97/
END
建了一個微信的安全交流群,歡迎添加我微信備注進群
,一起來聊天吹水哇,以及一個會發布安全相關內容的公眾號,歡迎關注 😃

