Python之concurrent.futures模塊的使用


concurrent.futures的作用:
       管理並發任務池。concurrent.futures模塊提供了使用工作線程或進程池運行任務的接口。線程和進程池API都是一樣,所以應用只做最小的修改就可以在線程和進程之間地切換

1、基於線程池使用map()

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import threading
import time

def task(n):
    print('{}: 睡眠 {}'.format(threading.current_thread().name,n))
    time.sleep(n / 10)
    print('{}: 執行完成 {}'.format(threading.current_thread().name,n))
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 開始運行')
results = ex.map(task, range(5, 0, -1)) #返回值是generator 生成器
print('main: 未處理的結果 {}'.format(results))
print('main: 等待真實結果')
real_results = list(results)
print('main: 最終結果: {}'.format(real_results))
futures_thread_pool_map.py

運行效果

[root@ mnt]# python3 futures_thread_pool_map.py 
main: 開始運行
ThreadPoolExecutor-0_0: 睡眠 5
ThreadPoolExecutor-0_1: 睡眠 4
main: 未處理的結果 <generator object Executor.map.<locals>.result_iterator at 0x7f1c97484678>
main: 等待真實結果
ThreadPoolExecutor-0_1: 執行完成 4
ThreadPoolExecutor-0_1: 睡眠 3
ThreadPoolExecutor-0_0: 執行完成 5
ThreadPoolExecutor-0_0: 睡眠 2
ThreadPoolExecutor-0_0: 執行完成 2
ThreadPoolExecutor-0_0: 睡眠 1
ThreadPoolExecutor-0_1: 執行完成 3
ThreadPoolExecutor-0_0: 執行完成 1
main: 最終結果: [0.5, 0.4, 0.3, 0.2, 0.1]

 2、futures執行單個任務

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import threading
import time

def task(n):
    print('{}: 睡眠 {}'.format(threading.current_thread().name, n))
    time.sleep(n / 10)
    print('{}: 執行完成 {}'.format(threading.current_thread().name, n))
    return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main :開始')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('等待運行結果')
results = f.result()
print('main: result:{}'.format(results))
print('main: future 之后的結果:{}'.format(f))
futures_thread_pool_submit.py

 運行效果

[root@ mnt]# python3 futures_thread_pool_submit.py 
main :開始
ThreadPoolExecutor-0_0: 睡眠 5
main: future: <Future at 0x7f40c0a6a400 state=running>
等待運行結果
ThreadPoolExecutor-0_0: 執行完成 5
main: result:0.5
main: future 之后的結果:<Future at 0x7f40c0a6a400 state=finished returned float>

3、futures.as_completed()按任意順序運行結果

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import time
from concurrent import futures

def task(n):
    time.sleep(random.random())
    return (n, n / 10)

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 開始')
wait_for = [
    ex.submit(task, i) for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
    print('main: result:{}'.format(f.result()))
futures_as_completed.py

運行效果

[root@ mnt]# python3 futures_as_completed.py 
main: 開始
main: result:(5, 0.5)
main: result:(4, 0.4)
main: result:(3, 0.3)
main: result:(1, 0.1)
main: result:(2, 0.2)

4、Future回調之futures.add_done_callback()

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import time

def task(n):
    print('task {} : 睡眠'.format(n))
    time.sleep(0.5)
    print('task {} : 完成'.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print('done {}:取消'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('done {} : 錯誤返回 : {}'.format(fn.arg, error))
        else:
            result = fn.result()
            print('done {} : 正常返回 : {}'.format(fn.arg, result))

if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main : 開始')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()
futures_future_callback.py

 運行效果

[root@ mnt]# python3 futures_future_callback.py 
main : 開始
task 5 : 睡眠
task 5 : 完成
done 5 : 正常返回 : 0.5

 5、Future任務取消之futures.cancel()

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import time

def task(n):
    print('task {} : 睡眠'.format(n))
    time.sleep(0.5)
    print('task {} : 完成'.format(n))
    return n / 10

def done(fn):
    if fn.cancelled():
        print('done {}:取消'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('done {} : 錯誤返回 : {}'.format(fn.arg, error))
        else:
            result = fn.result()
            print('done {} : 正常返回 : {}'.format(fn.arg, result))

if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main : 開始')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, task_obj in reversed(tasks):
        if not task_obj.cancel():
            print('main: 不能取消{}'.format(i))

    ex.shutdown()
futures_future_callback_cancel.py

 運行效果

[root@mnt]# python3 futures_future_callback_cancel.py 
main : 開始
main: submitting 10
task 10 : 睡眠
main: submitting 9
task 9 : 睡眠
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
done 1:取消
done 2:取消
done 3:取消
done 4:取消
done 5:取消
done 6:取消
done 7:取消
done 8:取消
main: 不能取消9
main: 不能取消10
task 10 : 完成
done 10 : 正常返回 : 1.0
task 9 : 完成
done 9 : 正常返回 : 0.9

 6、Future異常的處理

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures

def task(n):
    print('{} : 開始'.format(n))
    raise ValueError('這個值不太好 {}'.format(n))

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 開始...')

f = ex.submit(task, 5)

error = f.exception()
print('main: error:{}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('訪問結果值的異常 {}'.format(e))
futures_future_exception

 運行效果

[root@mnt]# python3 futures_future_exception.py 
main: 開始...
5 : 開始
main: error:這個值不太好 5
訪問結果值的異常 這個值不太好 5

  7、Future上下文管理即利用with打開futures.ThreadPoolExecutor()

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures

def task(n):
    print(n)

with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: 開始')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)
print('main: 結束')
futures_context_manager.py

運行效果

[root@ mnt]# python3 futures_context_manager.py 
main: 開始
1
2
3
4
main: 結束

 8、基於進程池使用map()

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import os

def task(n):
    return (n, os.getpid())

if __name__ == '__main__':
    ex = futures.ProcessPoolExecutor(max_workers=2)
    results = ex.map(task, range(50, 0, -1))
    for n, pid in results:
        print('task {} in 進程id {}'.format(n, pid))
futures_process_pool_map.py

 運行效果

[root@ mnt]# python3 futures_process_pool_map.py 
task 5 in 進程id 9192
task 4 in 進程id 8668
task 3 in 進程id 9192
task 2 in 進程id 8668
task 1 in 進程id 9192

9、基於進程池異常處理

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent import futures
import os
import signal

def task(n):
    return (n, os.getpid())

if __name__ == '__main__':
    with futures.ProcessPoolExecutor(max_workers=2) as ex:
        print('獲取工作進程的id')
        f1 = ex.submit(os.getpid)
        pid1 = f1.result()

        print('結束進程 {}'.format(pid1))
        os.kill(pid1, signal.SIGHUP)

        print('提交其它進程')
        f2 = ex.submit(os.getpid)
        try:
            pid2 = f2.result()
        except futures.process.BrokenProcessPool as e:
            print('不能開始新的任務:{}'.format(e))
futures_process_pool_broken.py

 

運行效果

[root@ mnt]# python3 futures_process_pool_broken.py 
獲取工作進程的id
結束進程 104623
提交其它進程
不能開始新的任務:A process in the process pool was terminated abruptly while the future was running or pending.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM