flask+celery 任務狀態查看與刪除


前言

寫這個的原因是網上關於celery+flask操作的很多博客大多停留在delay添加異步任務的階段,但是對於任務狀態的查看和卡頓任務的刪除進行講解的卻很少,即便有,很多也是關於django的,對於flask操作人員不太友好

所以在這篇博客中會以最簡的方式實現這兩個功能

環境

celeryredisflaskpython3

操作系統 windows10

注冊celery實例

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

#注冊celery實例
celery = make_celery(app)

任務狀態查看

celerybrokerredis擔任

使用state來存儲任務的狀態,celery的內置狀態有:

參數 說明
PENDING 任務等待中
STARTED 任務已開始
SUCCESS 任務執行成功
FAILURE 任務執行失敗
RETRY 任務將被重試
REVOKED 任務取消
PROGRESS 任務進行中

當然我們也可以自定義相關的狀態,將狀態以不同的樣式顯示出來

想要獲取到任務執行的狀態,我們需要先將celery的任務進行綁定

任務綁定到實例獲取到任務的上下文,我們就可以在任務運行時候獲取到任務的狀態,記錄相關日志等

如何進行任務綁定,給任務添加bind=True參數,如下

@celery.task(bind=True)
def scanIndex(self,url):

添加了bind參數之后,函數的參數發生變化, 多出了參數 self, 這這相當於把 scanIndex變成了一個已綁定的方法, 通過 self 可以獲得任務的上下文

我們先使用delay創建任務

tempResult=scanIndex.delay(url)

delay會返回給我們一個AsyncResult實例,不管什么時候,我們都可以通過返回的實例的task_idredis查找執行的結果,比如你在執行一個任務,你可以設置一個周期性輪詢,去查看這個結果是否已經被生產出來,如果生產出來便取到該值做相應的操作即可。

tid=tempResult.task_id

我們只需要及時將這個task_id存儲起來即可,這樣方便我們后面進行任務查看

這里我使用mysql進行任務信息的存儲

task表結構為:

class Task(db.Model):
    __tablename__ = 'task'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    tid= db.Column(db.String(128), nullable=False)
    state= db.Column(db.String(128), nullable=False)
    date = db.Column(db.String(30), nullable=False)

其中tid為任務task_idstate為任務的狀態,初始化任務后狀態手動設置為PENDING

temptask=Task(tid=tempResult.task_id,state='PENDING',date=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
db.session.add(temptask)
db.session.commit()

接着在我們的程序邏輯中,每到一個狀態就修改任務的狀態

比如,任務開始執行之后,進入執行函數,就將任務狀態修改為PROGRESS

@celery.task(bind=True)
def scanIndex(self,url):
    self.update_state(state="PROGRESS")

這里的self是因為在任務綁定后的celery任務中,update_state方法用於更新狀態

當用戶在前端進行任務狀態查看的時候,就先將mysql中的每一個tid取出來,到redis里面去查看任務狀態,並更新到mysql中,再將新的任務狀態顯示在頁面上即可

前端頁面的顯示可以使用不同的顏色來表示不同的狀態

{% if task.state=='PENDING'%}
	<tr class="">
{% elif task.state=='SUCCESS'%}
    <tr class="success">
{% elif task.state=='FAILURE'%}
    <tr class="danger">
{% elif task.state=='PROGRESS'%}
    <tr class="info">
{% endif %}
	</tr>

使用css以不同的顏色以區分,更能夠快速查看出錯和失效的任務

img

任務刪除

任務刪除比任務狀態查看更簡單,我們開始任務后先到redis里面看看任務執行之后是什么樣子的

127.0.0.1:6379> flushall
OK
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> keys *
1) "celery-task-meta-ac453fe5-e1c2-43b8-a923-dd4e8df9702a"
2) "_kombu.binding.celery"
127.0.0.1:6379>

而后面的ac453fe5-e1c2-43b8-a923-dd4e8df9702a正是任務task_id

所以我們只需要進行簡單的字符串拼接,可以獲取到任務在redis里面的全稱了,接着在python里面使用

taskname="celery-task-meta-"+tid
r.delete(taskname)

進行刪除,rredis的一個鏈接實例

mysql里面的對應任務已經產生的數據也用類似的方法,通過tid來進行查詢刪除即可

達到刪除celery任務的效果

參考鏈接

END

建了一個微信的安全交流群,歡迎添加我微信備注進群,一起來聊天吹水哇,以及一個會發布安全相關內容的公眾號,歡迎關注 😃

GIF GIF


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM