Python控制函數運行時間多線程


Python控制函數運行時間

 

在某個Flask項目在做后端接口時需要設置超時響應,因為接口中使用爬蟲請求了多個網站,響應時間時長時短。

我需要設置一個最大響應時間,時間內如果接口爬蟲沒跑完,直接返回請求超時。

從網上了解到有兩種方法,廢話不多說直接上代碼。

方法1:使用線程控制

復制代碼
import requests, datetime, time
import threading


class MyThread(threading.Thread):
    def __init__(self, target, args=()):
        """
        why: 因為threading類沒有返回值,因此在此處重新定義MyThread類,使線程擁有返回值
        此方法來源 https://www.cnblogs.com/hujq1029/p/7219163.html?utm_source=itdadao&utm_medium=referral
        """
        super(MyThread, self).__init__()
        self.func = target
        self.args = args

    def run(self):
        # 接受返回值
        self.result = self.func(*self.args)

    def get_result(self):
        # 線程不結束,返回值為None
        try:
            return self.result
        except Exception:
            return None


# 為了限制真實請求時間或函數執行時間的裝飾器
def limit_decor(limit_time):
    """
    :param limit_time: 設置最大允許執行時長,單位:秒
    :return: 未超時返回被裝飾函數返回值,超時則返回 None
    """

    def functions(func):
        # 執行操作
        def run(*params):
            thre_func = MyThread(target=func, args=params)
            # 主線程結束(超出時長),則線程方法結束
            thre_func.setDaemon(True)
            thre_func.start()
            # 計算分段沉睡次數
            sleep_num = int(limit_time // 1)
            sleep_nums = round(limit_time % 1, 1)
            # 多次短暫沉睡並嘗試獲取返回值
            for i in range(sleep_num):
                time.sleep(1)
                infor = thre_func.get_result()
                if infor:
                    return infor
            time.sleep(sleep_nums)
            # 最終返回值(不論線程是否已結束)
            if thre_func.get_result():
                return thre_func.get_result()
            else:
                return"請求超時"  #超時返回  可以自定義

        return run

    return functions

#接口函數
def a1():
    print("開始請求接口")

    #這里把邏輯封裝成一個函數,使用線程調用
    a_theadiing = MyThread(target=a2)
    a_theadiing.start()
    a_theadiing.join()

    #返回結果
    a = a_theadiing.get_result()

    print("請求完成")
    return a
@limit_decor(3)   #超時設置為3s   2s邏輯未執行完畢返回接口超時
def a2():
    print("開始執行")
    time.sleep(2)
    print("執行完成")
    a=2
    return a

# 程序入口     未超時返回a的值   超時返回請求超時
if __name__ == '__main__':
    a = a1()  #調用接口(這里把函數a1看做一個接口)
    print(a)
復制代碼

超時設置3s,線程調用函數運行2s,這里返回a的值2。

 

方法2:使用信號模塊signal(只能在unix系統使用)

signal負責在Python程序內部處理信號,典型的操作包括預設信號處理函數,暫停並等待信號,以及定時發出SIGALRM等。

要注意,signal包主要是針對UNIX平台(比如Linux, MAC OS),而Windows內核中由於對信號機制的支持不充分,所以在Windows上的Python不能發揮信號系統的功能。

信號是進程之間通訊的方式,是一種軟件中斷。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。

復制代碼
def set_timeout(num):
    def wrap(func):
        def handle(signum, frame):  # 收到信號 SIGALRM 后的回調函數,第一個參數是信號的數字,第二個參數是the interrupted stack frame.
            raise RuntimeError

        def to_do(*args):
            try:
                signal.signal(signal.SIGALRM, handle)  # 設置信號和回調函數
                signal.alarm(num)  # 設置 num 秒的鬧鍾
                print('start alarm signal.')
                r = func(*args)
                print('close alarm signal.')
                signal.alarm(0)  # 關閉鬧鍾
                return r
            except RuntimeError as e:
                return "超時啦"
        return to_do

    return wrap


@set_timeout(2)  # 限時 2 秒超時
def connect():  # 要執行的函數
    time.sleep(3)  # 函數執行時間,寫大於2的值,可測試超時
    return "完成"


if __name__ == '__main__':
    a = connect()
復制代碼

 

 
分類:  PythonPythonWeb
 
 
 

講述了Python實現可設置持續運行時間、線程數及時間間隔的多線程異步post請求功能。分享給大家供大家參考,具體如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#coding=utf8
'''
random.randint(a, b):用於生成一個指定范圍內的整數。
其中參數a是下限,參數b是上限,生成的隨機數n: a <= n <= b
random.choice(sequence):從序列中獲取一個隨機元素
參數sequence表示一個有序類型(列表,元組,字符串)
'''
import httplib,json
import time
import threading
from random import randint,choice
#創建請求函數
def postRequest(threadNum):
   postJson = {
         }
   #定義需要進行發送的數據
   postData = json.dumps(postJson)
   #定義一些文件頭
   headerdata = {
     "content-type" : "application/json" ,
      }
   #接口
   requrl = "/v1/query"
   #請求服務,例如:www.baidu.com
   hostServer = ""
   #連接服務器
   conn = httplib.HTTPConnection(hostServer)
   #發送請求
   conn.request(method = "POST" ,url = requrl,body = postData,headers = headerdata)
   #獲取請求響應
   response = conn.getresponse()
   #打印請求狀態
   if response.status in range ( 200 , 300 ):
     print u "線程" + str (threadNum) + u "狀態碼:" + str (response.status)
   conn.close()
def run(threadNum,internTime,duration):
   #創建數組存放線程
   threads = []
   try :
     #創建線程
     for i in range ( 1 ,threadNum):
       #針對函數創建線程
       t = threading.Thread(target = postRequest,args = (i,))
       #把創建的線程加入線程組
       threads.append(t)
   except Exception,e:
     print e
   try :
     #啟動線程
     for thread in threads:
         thread.setDaemon( True )
         thread.start()
         time.sleep(internTime)
     #等待所有線程結束
     for thread in threads:
         thread.join(duration)
   except Exception,e:
       print e
if __name__ = = '__main__' :
   startime = time.strftime( "%Y%m%d%H%M%S" )
   now = time.strftime( "%Y%m%d%H%M%S" )
   duratiion = raw_input (u "輸入持續運行時間:" )
   while (startime + str (duratiion))! = now:
     run( 10 , 1 , int (duratiion))
     now = time.strftime( "%Y%m%d%H%M%S" )
 
 
 

 

 

 

 

例講述了python使用裝飾器和線程限制函數執行時間的方法。分享給大家供大家參考。具體分析如下:

 

很多時候函數內部包含了一些不可預知的事情,比如調用其它軟件,從網絡抓取信息,可能某個函數會卡在某個地方不動態,這段代碼可以用來限制函數的執行時間,只需要在函數的上方添加一個裝飾器,timelimited(2)就可以限定函數必須在2秒內執行完成,如果執行完成則返回函數正常的返回值,如果執行超時則會拋出錯誤信息。

  1. # -*- coding: utf-8 -*-
  2. from threading import Thread
  3. import time
  4. class TimeoutException(Exception):
  5. pass
  6. ThreadStop = Thread._Thread__stop#獲取私有函數
  7. def timelimited(timeout):
  8. def decorator(function):
  9. def decorator2(*args,**kwargs):
  10. class TimeLimited(Thread):
  11. def __init__(self,_error= None,):
  12. Thread.__init__(self)
  13. self._error = _error
  14. def run(self):
  15. try:
  16. self.result = function(*args,**kwargs)
  17. except Exception,e:
  18. self._error =e
  19. def _stop(self):
  20. if self.isAlive():
  21. ThreadStop(self)
  22. t = TimeLimited()
  23. t.start()
  24. t.join(timeout)
  25. if isinstance(t._error,TimeoutException):
  26. t._stop()
  27. raise TimeoutException('timeout for %s' % (repr(function)))
  28. if t.isAlive():
  29. t._stop()
  30. raise TimeoutException('timeout for %s' % (repr(function)))
  31. if t._error is None:
  32. return t.result
  33. return decorator2
  34. return decorator
  35. @timelimited(2)
  36. def fn_1(secs):
  37. time.sleep(secs)
  38. return 'Finished'
  39. if __name__ == "__main__":
  40. print fn_1(4)

 

希望本文所述對大家的Python程序設計有所幫助。

 

大家介紹了關於Python 2.x如何設置命令執行超時時間的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考借鑒,下面來一起看看吧。

前言

在Python2.x中的幾個用來執行命令行的庫或函數在執行命令是均不能設置一個命令執行的超時時間,用來在命令執行時間超時時終端這個命令的執行,這個功能在3.x(?)中解決了,但是在2.x還是只能自己實現。下面話不多說了,來一起看看詳細的介紹吧。

下面就簡單實現了一個版本:

  1. import subprocess
  2. from threading import Timer
  3.  
  4.  
  5. def call(args, timeout):
  6. = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  7.  
  8. timer = Timer(timeout, lambda process: process.kill(), [p])
  9.  
  10. try:
  11. timer.start()
  12. stdout, stderr = p.communicate()
  13. return_code = p.returncode
  14. return (stdout, stderr, return_code)
  15. finally:
  16. timer.cancel()

測試

  1. print call(['hostname'], 2)
  2. print call(['ping', 'www.baidu.com'], 2)

 

python程序運行超過時長強制退出方式,防止程序卡死;
主要兩種方式:
1、程序內部設置時長,超過退出

import datetime
import time

import datetime
starttime = datetime.datetime.now()
#long running
endtime = datetime.datetime.now()
print (endtime – starttime).seconds

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
import datetime
import time


t1 = time.time()
t1 = time.localtime(t1).tm_hour
print(t1)
while 1:
    if time.localtime(time.time()).tm_hour - t1<3:
        print("@@@@",time.localtime(time.time()).tm_hour)
    else:
        print("break")
        break
print("finsh")


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
import time
import eventlet#導入eventlet這個模塊
eventlet.monkey_patch()#必須加這條代碼
with eventlet.Timeout(2,False):#設置超時時間為2秒
   time.sleep(4)
   print('沒有跳過這條輸出')
print('跳過了輸出')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2、程序外部控制,超過強制退出

通過jenkins插件build-timeout ,安裝后重啟生效https://updates.jenkins.io/latest/build-timeout.hpi

Absolute

多少分鍾沒有結束則執行動作

參考:https://blog.csdn.net/juewuer/article/details/103469640


基於 signal模塊實現:

signal包負責在Python程序內部處理信號,典型的操作包括預設信號處理函數,暫 停並等待信號,以及定時發出SIGALRM等。要注意,signal包主要是針對UNIX平台(比如Linux, MAC OS),而Windows內核中由於對信號機制的支持不充分,所以在Windows上的Python不能發揮信號系統的功能。 

  1.  
     
  2.  
     
  3.  
    # coding:utf8
  4.  
    import time
  5.  
    import signal
  6.  
     
  7.  
     
  8.  
    # 自定義超時異常
  9.  
    class TimeoutError(Exception):
  10.  
    def __init__(self, msg):
  11.  
    super(TimeoutError, self).__init__()
  12.  
    self.msg = msg
  13.  
     
  14.  
     
  15.  
    def time_out(interval, callback):
  16.  
    def decorator(func):
  17.  
    def handler(signum, frame):
  18.  
    raise TimeoutError("run func timeout")
  19.  
     
  20.  
    def wrapper(*args, **kwargs):
  21.  
    try:
  22.  
    signal.signal(signal.SIGALRM, handler)
  23.  
    signal.alarm(interval) # interval秒后向進程發送SIGALRM信號
  24.  
    result = func(*args, **kwargs)
  25.  
    signal.alarm( 0) # 函數在規定時間執行完后關閉alarm鬧鍾
  26.  
    return result
  27.  
    except TimeoutError, e:
  28.  
    callback(e)
  29.  
    return wrapper
  30.  
    return decorator
  31.  
     
  32.  
     
  33.  
    def timeout_callback(e):
  34.  
    print(e.msg)
  35.  
     
  36.  
     
  37.  
    @time_out(2, timeout_callback)
  38.  
    def task1():
  39.  
    print( "task1 start")
  40.  
    time.sleep( 3)
  41.  
    print( "task1 end")
  42.  
     
  43.  
     
  44.  
    @time_out(2, timeout_callback)
  45.  
    def task2():
  46.  
    print( "task2 start")
  47.  
    time.sleep( 1)
  48.  
    print( "task2 end")
  49.  
     
  50.  
     
  51.  
    if __name__ == "__main__":
  52.  
    task1()
  53.  
    task2()

輸出: 

  1.  
    task1 start
  2.  
    run func timeout
  3.  
    task2 start
  4.  
    task2 end

 基於子線程阻塞實現超時:

  1.  
    # coding:utf8
  2.  
    import time
  3.  
    import threading
  4.  
     
  5.  
     
  6.  
    def callback_func():
  7.  
    print( '超時回調')
  8.  
     
  9.  
     
  10.  
    def time_out(interval, callback=None):
  11.  
    def decorator(func):
  12.  
    def wrapper(*args, **kwargs):
  13.  
    t =threading.Thread(target=func, args=args, kwargs=kwargs)
  14.  
    t.setDaemon( True) # 設置主線程技術子線程立刻結束
  15.  
    t.start()
  16.  
    t.join(interval) # 主線程阻塞等待interval秒
  17.  
    if t.is_alive() and callback:
  18.  
    return threading.Timer(0, callback).start() # 立即執行回調函數
  19.  
    else:
  20.  
    return
  21.  
    return wrapper
  22.  
    return decorator
  23.  
     
  24.  
     
  25.  
    @time_out(2, callback_func)
  26.  
    def task3(hh):
  27.  
    print( '**********task3****************')
  28.  
    for i in range(3):
  29.  
    time.sleep( 1)
  30.  
    print(i)
  31.  
    print(hh)
  32.  
     
  33.  
     
  34.  
    @time_out(2, callback_func)
  35.  
    def task4(hh):
  36.  
    print( '**********task4****************')
  37.  
    for i in range(3):
  38.  
    # time.sleep(1)
  39.  
    print(i)
  40.  
    print(hh)
  41.  
     
  42.  
     
  43.  
    if __name__ == '__main__':
  44.  
    task3( '參數')
  45.  
    task4( '參數')

輸出:

  1.  
    **********task3****************
  2.  
    0
  3.  
    參數
  4.  
    1
  5.  
    參數
  6.  
    超時回調
  7.  
    **********task4****************
  8.  
    0
  9.  
    參數
  10.  
    1
  11.  
    參數
  12.  
    2
  13.  
    參數

基於協程實現

  1.  
    def callback_func():
  2.  
    print( 'callback')
  3.  
     
  4.  
     
  5.  
    def time_out(interval, callback=None):
  6.  
    def decorator(func):
  7.  
    def wrapper(*args, **kwargs):
  8.  
    ########## 該部分必選在requests之前導入
  9.  
    import gevent
  10.  
    from gevent import monkey
  11.  
    monkey.patch_all()
  12.  
    ##########
  13.  
     
  14.  
    try:
  15.  
    gevent.with_timeout(interval, func, *args, **kwargs)
  16.  
    except gevent.timeout.Timeout as e:
  17.  
    callback() if callback else None
  18.  
     
  19.  
    return wrapper
  20.  
     
  21.  
    return decorator
  22.  
     
  23.  
     
  24.  
    @time_out(3, callback_func)
  25.  
    def func(a, b):
  26.  
    import time
  27.  
    time.sleep( 2)
  28.  
    print(a,b)
  29.  
     
  30.  
     
  31.  
    func( 1, 2)

 

 

 

中調用第三方接口時候,經常會出現請求超時的情況,或者參數的問題導致調用異代碼異常。針對超時異常,查詢了python 相關文檔,沒有並發現完善的包來根據用戶自定義
的時間來拋出超時異常的模塊。所以自己干脆自己來實現一個自定義的超時異常。目前找到了兩種方式來實現超時異常的功能(signal.alarm()、threading實現超時異常)
方法1 thread + time 
原理:將要調用的功能函數放入子線程,通過設定子線程的阻塞時間,超時則主線程並不會等待子線程的執行。主線程退出,子線程就不存在了。
核心就是在程序中添加 join()方法,用於等待線程結束。join()的作用是,在子線程完成運行之前,這個子線程的父線程將會被一直阻塞.
 1 # coding=utf-8
 2 import threading
 3 import time
 4 
 5 
 6 def myFunc():
 7     time.sleep(4)
 8     print("myFunc執行了")
 9 
10 
11 if __name__ == '__main__':
12     t = threading.Thread(target=myFunc)
13     t.setDaemon(True)
14     t.start()
15 
16     t.join(2)
17     print("it's over")

執行結果:

  it's over

可以看出,當主線程執行到2秒時候,結束退出。子線程還沒有結束,沒有執行完及被強制退出

 1 # coding=utf-8
 2 import threading
 3 import time
 4 
 5 
 6 def myFunc():
 7     time.sleep(1)
 8     print("myFunc執行了")
 9 
10 
11 if __name__ == '__main__':
12     t = threading.Thread(target=myFunc)
13     t.setDaemon(True)
14     t.start()
15 
16     t.join(2)
17     print("it's over")
顯示結果:

  myFunc執行了
  it's over

可以看出,子線程結束時,用時1秒,沒有超過主線程設定的3秒,所以主線程與子線程都被執行了

方法 2  signal.alarm() ,注意兩點:一是signal信號機制要在linux上才能運行; 二是signal信號在主線程中才會會起作用

 1 import signal
 2 import time
 3 
 4 
 5 # Define signal handler function
 6 def myHandler(signum, frame):
 7     exit("TimeoutError")
 8 
 9 
10 def test_fun():
11     # time.sleep(3)
12     int("afsdf")
13     a = 2 + 3
14 
15     return a
16 
17 
18 
19 if __name__ == '__main__':
20     try:
21         signal.signal(signal.SIGALRM, myHandler)
22         signal.alarm(2)
23         test = test_fun()
24         print(test)
25         signal.alarm(0)
26     except Exception as ret:
27         print("msg:", ret)

執行結果:
  當 time.sleep(3) 時,會拋出TimeoutError的異常
  當 test_fun 里面出現 int("afsdf")時, 會拋出 ValueError("invalid literal for int() with base 10: 'afsdf'",))
  當test_fun函數執行的時間小於2 秒時,就會返回函數對應的值

方法3  帶有返回值的超時異常,可以通過創建thread類的方式來進行捕捉

 1 import threading
 2 import sys
 3 import time
 4 
 5 
 6 class Dispacher(threading.Thread):
 7     def __init__(self, fun, args):
 8         threading.Thread.__init__(self)
 9         self.setDaemon(True)
10         self.result = None
11         self.error = None
12         self.fun = fun
13         self.args = args
14 
15         self.start()
16 
17     def run(self):
18         try:
19             self.result = self.fun(self.args)
20         except:
21             self.error = sys.exc_info()
22 
23 
24 def test_fun(i):
25     # time.sleep(4)
26     a = i*i
27     # b    
29   return a
30 def main_fun():
31     c = Dispacher(test_fun, 2)
32     c.join(2)
33 
34     if c.isAlive():
35         return "TimeOutError"
36     elif c.error:
37         return c.error[1]
38     t = c.result
39     return t
40 
41 if __name__ == '__main__':
42     fun = main_fun()
43     print(fun)
     
顯示結果:
  test_fun 執行時間大於設置的2秒時,會拋出TimeOutError
  test_fun 執行時間小於設置的2秒時,並且函數正常執行時,顯示:4
  test_fun 里面出現比如 “b” 時,會拋出 global name 'b' is not defined 的異常


 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM