python函數超時處理


需求背景:在執行一個函數時可能該函數會卡住導致整個程序無法執行,這時候就需要函數超時處理了;舉一個具體的例子:python在進行kafka消費數據是通常會取一批數據(例如100個)進行多線程或者多進程處理,但是kafka可能會只剩余20個數據了,這時候就會一直在等待kafka的新數據,而這20條數不會被消費,就會造成延時處理的問題。

處理思路:在kafka那里加一個超時處理機制,如果一定時間內返回不了數據,就退出該函數並把獲取到的數據返回給調用方。

一、timeout_decorator  (pip3 installtimeout_decorator)

import time
import timeout_decorator


@timeout_decorator.timeout(8)  # 這里寫限制的時間
def mytest():
    print("Start")
    list_data = []
    try:
        for i in range(10):
            list_data.append(i)
            time.sleep(1)
        return list_data
    except Exception as e:
        print(e)
        return list_data


def main():
    result = mytest()
    print(result)


if __name__ == '__main__':
    main()

二、stopit  (pip3 install stopit)

import stopit
import time


@stopit.threading_timeoutable()  # 該庫沒有報錯信息
def mytest():
    list_data = []
    try:
        for i in range(10):
            print(i)
            list_data.append(i)
            time.sleep(1)
        return list_data
    except Exception as e:
        print(e)  # 該方法不會出現報錯信息
        return list_data


def main():
    result = mytest(timeout=4)  # 這里寫限制的時間
    print(result)


if __name__ == '__main__':
    main()

  

結論:第一種會把"Timed Out"這個錯誤報出來,第二種不會報這個錯誤;如果函數有錯誤會報出來的。

參考:https://www.houyunbo.com/python%E8%B0%83%E7%94%A8%E5%87%BD%E6%95%B0%E8%B6%85%E6%97%B6%E8%AE%BE%E7%BD%AE.html,https://blog.csdn.net/weixin_32673065/article/details/112083276

 

from kafka import KafkaConsumer, KafkaProducer
import json, time
import timeout_decorator


@timeout_decorator.timeout(500)
def demo(consumer):
    list_data = []
    try:
        for i in consumer:
            list_data.append(i)
            data = i.value.decode("utf-8")
            print(data, type(json.loads(data)))
            if len(list_data) > 20:
                break
        return list_data
    except Exception as e:
        print(e)
        return list_data


def main():
    hosts = ['127.0.0.1:9092', '0.0.0.0:9092']
    topic = "Coupon_Update_Access_V1"
    group_id = "qly_test0001"
    consumer = KafkaConsumer(topic, bootstrap_servers=hosts, group_id=group_id, auto_offset_reset="earliest")
    while True:
        result = demo(consumer)
        print(len(result))
        time.sleep(5)


if __name__ == '__main__':
    main()

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM