ThreadPoolExecutor多線程異步執行


https://www.cnblogs.com/pdev/p/10685093.html

1. 以下為第一種,函數級的異步執行:

import time
from concurrent.futures import ThreadPoolExecutor


def task1_fn(url):
    time.sleep(10)
    return (url + " FINISHED")


def task2_fn(url):
    time.sleep(3)
    return (url + " FINISHED")


def RunPool():
    pool = ThreadPoolExecutor(1)  # 啟動一個線程池
    task1 = pool.submit(task1_fn, "CPU1")  # 在另外的線程中運行RunBenchmark() 
    task2 = pool.submit(task2_fn, "CPU2")  # 在另外的線程中運行RunBenchmark() 
    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()

        print("task1:%s" % task1.done())
        print("task2:%s" % task2.done())
        if task1.done():
            print('task1 is over')

        if task1.done() and task2.done():
            print('all is over.')
            break
        time.sleep(1)
        cnt += 2

    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())

    print("bye")


if __name__ == '__main__':
    RunPool()

 

 

2. 類級函數的的異步執行,添加了線程強制中斷 pool.shutdown

import time
from concurrent.futures import ThreadPoolExecutor

class A():
    def __init__(self):
        pass

    def test_ret(self):
        return 'hello world'

    def task1_fn(self, url):
        time.sleep(10)
        return (url + " FINISHED" + self.test_ret())

    def task2_fn(self, url):
        time.sleep(3)
        return (url + " FINISHED" + self.test_ret())


def RunPool():
    a = A()
    pool = ThreadPoolExecutor(2)    # 啟動一個線程池
    task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的線程中運行RunBenchmark() 
    task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的線程中運行RunBenchmark() 

    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()

        print("task1:%s" % task1.done())
        print("task2:%s" % task2.done())
        if task1.done():
            print('task1 is over')
        else:
            task1.cancel()
            pool.shutdown(wait=False)
            print('task1 result AAAAAAAA %s' % task1.result())
            break

        # if task1.done() and task2.done():
        #     print('all is over.')
        #     break
        time.sleep(1)
        cnt += 2

    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())

    print("bye")

if __name__ == '__main__':
      RunPool()

 

3. 第一個任務一旦完成,則強制終止線程

應用場景:如果某個任務一直處於執行中,無法退出,此時就需要強制退出,而強制退出一般需要重新線程run方法,但 模塊 from concurrent.futures import ThreadPoolExecutor 重寫run方法較麻煩,故采用另一種方式

wait(f_lst, return_when='FIRST_COMPLETED')
f_lst中存放的是提交到線程池中的線程對象列表
import time
import inspect
import ctypes
from concurrent.futures import ThreadPoolExecutor, wait

class A():
    def __init__(self):
        pass

    def test_ret(self):
        return 'hello world'

    def task1_fn(self, url):
        time.sleep(200)
        return (url + " FINISHED " + self.test_ret() + ' ' + str(var))

    def task2_fn(self, url):
        time.sleep(5)
        return (url + " FINISHED " + self.test_ret())


def RunPool():
    a = A()
    pool = ThreadPoolExecutor(2)  # 啟動一個線程池
    task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的線程中運行RunBenchmark() 
    task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的線程中運行RunBenchmark() 

    f_lst = [task1, task2]
    wait(f_lst, return_when='FIRST_COMPLETED')

    try:
        print('task status, task1: %s, task2: %s' % (task1.done(), task2.done()))
        # print('task1 result:%s' % task1.result())
        print('task2 result:%s' % task2.result())
    except Exception as e:
        print('error: %s' % e)
    print("bye")


if __name__ == '__main__':
    var = 123
    RunPool()

注意:只有先完成的任務才可以使用 task.result() 方法,否則會一直卡着,等待 結果返回,而此時線程已被殺死,無法返回結果

 

4. 關鍵點說明:

1) task.cancel()當線程未運行之前可以取消,但是線程在線程池中啟動后,無法再通過此方式取消
2) pool.shutdown()函數入參 wait=True表示中斷線程前,等待直到線程執行完成才會中斷, wait=False 表示 線程未執行完成,強制中斷

3)傳參方式和位置參數一樣,不用使用元組

5. 適用業務:

多進程下的多線程,某個任務可能需要等待很長時間,此時就需要中斷此任務,或者去執行其他的任務,如果要求線性處理,中斷超時任務,就需要用此方法
針對上述情況下的單個進程,可以使用異步進程池 multiprocessing的apply_async方式,主進程和子進程同時執行,互不影響。

https://www.cnblogs.com/pdev/p/10685093.html
**1. 以下為第一種,函數級的異步執行:**`import timefrom concurrent.futures import ThreadPoolExecutor

import timefrom concurrent.futures import ThreadPoolExecutor
def task1_fn(url):
    time.sleep(10)
    return (url + " FINISHED")

def task2_fn(url):
    time.sleep(3)
    return (url + " FINISHED")

def RunPool():
    pool = ThreadPoolExecutor(1)  # 啟動一個線程池
    task1 = pool.submit(task1_fn, "CPU1")  # 在另外的線程中運行RunBenchmark() 
    task2 = pool.submit(task2_fn, "CPU2")  # 在另外的線程中運行RunBenchmark() 
    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()
    print("task1:%s" % task1.done())
    print("task2:%s" % task2.done())
    if task1.done():
        print('task1 is over')
    if task1.done() and task2.done():
        print('all is over.')
        break
        time.sleep(1)
        cnt += 2
    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())
    print("bye")

if __name__ == '__main__':      
    RunPool()

 

 


**2. 類級函數的的異步執行,添加了線程強制中斷 pool.shutdown**

import timefrom concurrent.futures import ThreadPoolExecutor
class A():
    def __init__(self):
        pass
    def test_ret(self):
        return 'hello world'
    def task1_fn(self, url):
        time.sleep(10)
        return (url + " FINISHED" + self.test_ret())
    def task2_fn(self, url):
        time.sleep(3)
        return (url + " FINISHED" + self.test_ret())

def RunPool():
    a = A()
    pool = ThreadPoolExecutor(2)    # 啟動一個線程池
    task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的線程中運行RunBenchmark() 
    task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的線程中運行RunBenchmark() 
    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()
    print("task1:%s" % task1.done())
    print("task2:%s" % task2.done())
    if task1.done():
        print('task1 is over')
    else:
        task1.cancel()
        pool.shutdown(wait=False)
        print('task1 result AAAAAAAA %s' % task1.result())
        break
    # if task1.done() and task2.done():
    #     print('all is over.')
    #     break
    time.sleep(1)
    cnt += 2
    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())
    print("bye")

if __name__ == '__main__':      
    RunPool()

 

**> 關鍵點說明:**1. task.cancel()當線程未運行之前可以取消,但是線程在線程池中啟動后,無法再通過此方式取消2. pool.shutdown()函數入參 wait=True表示中斷線程前,等待直到線程執行完成才會中斷, wait=False 表示 線程未執行完成,強制中斷
**> 適用業務:**      多進程下的多線程,某個任務可能需要等待很長時間,此時就需要中斷此任務,或者去執行其他的任務,如果要求線性處理,中斷超時任務,就需要用此方法      針對上述情況下的單個進程,可以使用異步進程池 multiprocessing的apply_async方式,主進程和子進程同時執行,互不影響。

 

總結:

#shutdown(wait=True) 相當於進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源后才繼續 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM