Django異步任務線程池


當數據庫數據量很大時(百萬級),許多批量數據修改請求的響應會非常慢,一些不需要即時響應的任務可以放到后台的異步線程中完成,發起異步任務的請求就可以立即響應

選擇用線程池的原因是:線程比進程更為可控。不像子進程,子線程會在所屬進程結束時立即結束。線程可共享內存。

請求任務異步處理的原理

使用python manage.py runserver模式啟動的Django應用只有一個進程,對於每個請求,主線程會開啟一個子線程來處理請求。請求子線程向主線程申請一個新線程,然后把耗時的任務交給新線程,自身立即響應,這就是請求任務異步處理的原理。

可視化線程池

如果想要管理這批異步線程,知道他們是否在運行中,可以使用線程池(ThreadPoolExecutor)。

線程池會先啟動若干數量的線程,並讓這些線程都處於睡眠狀態,當向線程池submit一個任務后,會喚醒線程池中的某一個睡眠線程,讓它來處理這個任務,當處理完這個任務,線程又處於睡眠狀態。

submit任務后會返回一個期程(future),這個對象可以查看線程池中執行此任務的線程是否仍在處理中

因此可以構建一個全局可視化線程池:

from concurrent.futures.thread import ThreadPoolExecutor


class ThreadPool(object):
    def __init__(self):
        # 線程池
        self.executor = ThreadPoolExecutor(20)
        # 用於存儲每個項目批量任務的期程
        self.future_dict = {}

    # 檢查某個項目是否有正在運行的批量任務
    def is_project_thread_running(self, project_id):
        future = self.future_dict.get(project_id, None)
        if future and future.running():
            # 存在正在運行的批量任務
            return True
        return False

    # 展示所有的異步任務
    def check_future(self):
        data = {}
        for project_id, future in self.future_dict.items():
            data[project_id] = future.running()
        return data

    def __del__(self):
        self.executor.shutdown()

# 主線程中的全局線程池
# global_thread_pool的生命周期是Django主線程運行的生命周期
global_thread_pool = ThreadPool()

使用:

# 檢查異步任務
if global_thread_pool.is_project_thread_running(project_id):
    raise exceptions.ValidationError(detail='存在正在處理的批量任務,請稍后重試')

# 提交一個異步任務
future = global_thread_pool.executor.submit(self.batch_thread, project_id)
global_thread_pool.future_dict[project_id] = future

# 查看所有異步任務
@login_required
def check_future(request):
    data = global_thread_pool.check_future()
    return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))

串行執行

使用線程鎖

在全局線程池中初始化線程鎖

class ThreadPool(object):
    def __init__(self):
        self.executor = ThreadPoolExecutor(20)
        self.future_dict = {}
        self.lock = threading.Lock()

然后執行線程前需要獲取鎖並再執行結束后釋放鎖

def batch_thread(self):
    global_thread_pool.lock.acquire()
    try:
        ...
        global_thread_pool.lock.release()
    except Exception:
        trace_log = traceback.format_exc()
        logger.error('異步任務執行失敗:\n %s' % trace_log)
        global_thread_pool.lock.release()

需要捕捉異常預防子線程出錯而無法釋放鎖的情況

異步線程任務執行前先檢查數據庫連接是否可用,然后關掉不可用連接

由於django的數據庫連接是保存到線程本地變量中的,通過ThreadPoolExecutor創建的線程會保存各自的數據庫連接。

當連接被保存的時間超過mysql連接的最大超時時間,連接失效,但不會被線程釋放。

之后再調起線程執行涉及到數據庫操作的異步任務時,會用到失效的數據庫連接,導致報錯“MySQL server has gone away”。

解決方案是在線程池的所有異步任務執行前先檢查數據庫連接是否可用,然后關掉不可用連接

def batch_thread(self):
    for conn in connections.all():
        conn.close_if_unusable_or_obsolete()
    ...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM