python使用flask+Thread實現異步任務


# 定義一個WorkerController,用於執行業務代碼
class WorkerController(object):

    def __init__(self):
        pass

    def do_something(self, params):
        print("do something")
# 定義一個AsyncWorker,繼承Thread,重寫run方法,run方法中調用WorkerController的do_something方法

from threading import Thread
from worker_controller import WorkerController

WORKER_CONTROLLER = WorkerController()


class AsyncWorker(Thread):
    ASYNC_WORKER_INFO = dict()

    def __init__(self, params, ticket_id):
        Thread.__init__(self)
        self.params = params
        self.ticket_id = ticket_id
        self.daemon = True
        self.start()

    def run(self):
        AsyncWorker.ASYNC_WORKER_INFO[self.ticket_id] = {
            # 保存一些業務信息,之后輪詢的時候,可以作為輸出返回
            "some": "information",
            "status": "running"
        }
        WORKER_CONTROLLER.do_something(self.params)
# 開始任務
@app.route('/do-something', methods=['GET'])
def do_something():
    params = request.args.get('params', '').strip()
    # 生成一個任務id,用於輪詢,例如ticket id
    ticket_id = genearteMD5(str(int(round(time.time() * 1000))))
    # 實例化一個AsyncWorker
    AsyncWorker(params=params, ticket_id=ticket_id)
    # 將ticket_id返回
    return jsonify({"ticket_id": ticket_id})

# 輪詢任務
@app.route("/check-status", methods=['GET'])
def check_status():
    ticket_id = request.args.get("ticket_id")
    logs = AsyncWorker.ASYNC_WORKER_INFO[str(ticket_id)]
    return jsonify(logs)

如何在前端進行異步輪詢呢?以angularjs的interval方法為例:https://www.cnblogs.com/CheeseZH/p/12444034.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM