python並發——多進程中的異常捕獲


在正常的python多進程中,父進程只負責將任務分發給子進程,子進程成功與否,父進程並不關心.

但是在生產環境中,這種顯然是不恰當的.通過研究我發現通過回調方法可以獲取子進程的狀態,然后通過一個queue將失敗的子進程標記出來,就可以實現失敗進程的重試,代碼如下:

import queue
import random
import time
from multiprocessing import Pool

q = queue.Queue()
success_count = 0

#實際運行的task方法
def long_time_task(table_name):
    rd = random.randint(1, 10)
  #任務失敗拋出異常,拋出的就是任務名稱,在回調方法中可以捕獲
if rd % 2 == 0: raise Exception(table_name) global q print('ok=%s' % table_name) time.sleep(1) #成功任務計數 def success(suc): global success_count success_count = success_count + 1 #失敗任務捕獲 def err(error): q.put(error) if __name__ == '__main__':
  #初始化一個4個槽位線程池 p
= Pool(4) lists = ['table_' + str(i) for i in range(1, 21)] lists_num=len(lists)
  #將任務放入隊列
for i in lists: q.put(i)
  #隊列為空不一定就是所有任務完成了,可能還有在運行中的任務,所以需要兩個條件滿足才能退出循環
while not q.empty() or success_count!=lists_num: if q.empty(): time.sleep(1) else: p.apply_async(long_time_task, args=(q.get(),), callback=success, error_callback=err) p.close() p.join() print('q.size=%d,success_count=%d' % (q.qsize(), success_count))

 上面的例子有時候是能運行的,但有時候不能,特別是子程序執行很快的時候while經常陷入死循環,原因是

多進程中q.empty()不可信,異步過程中在分發任務時success_count無法獲取最終結果,會導致死循環,調整后的程序如下
import os
import time
from multiprocessing import Pool
from multiprocessing import Queue
import random

q = Queue()
success_count = 0


# 通過回調捕獲異常
def err(error):
    q.put(error)


def suc(success):
    global success_count
    success_count = success_count + 1


def run_data_x(sub_table_name):
    result = os.system("echo {sub_table_name} 2".format(sub_table_name=sub_table_name))
    time.sleep(0.1)
    if result != 0 or random.randint(1, 100) == 9:
        print('failed!!!,tb=%s' % sub_table_name)
        raise Exception(sub_table_name)


if __name__ == '__main__':

    p = Pool(3)

    table_list = ['test_0', 'test_1', 'test_2', 'test_3',
                  'test_4', 'test_5', 'test_6', 'test_7',
                  'test_8', 'test_9', 'test_10', 'test_11',
                  'test_12', 'test_13', 'test_14', 'test_15']

    table_list_num = len(table_list)
    for tb in table_list:
        q.put(tb)

    print('*********************************')

    # 多進程中q.empty()不可信,異步過程中success_count在分發任務時尚無法獲取最終結果,會導致死循環
    # 調整為同步獲取結果即可
    while success_count != table_list_num:
        # p.apply_async(run_data_x, args=(q.get(),), callback=suc, error_callback=err)
        try:
            p.apply(run_data_x, args=(q.get(True),))
            suc(1)
        except Exception as e:
            err(e)

    p.close()
    p.join()
    print('success_count=%d' % success_count)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM