Python中的進程池和線程池


 0.concurrent.futures庫

參考:https://docs.python.org/3/library/concurrent.futures.html

之前我們使用多線程(threading)和多進程(multiprocessing)完成常規的需求:
    在啟動的時候start、jon等步驟不能省,復雜的需要還要用1-2個隊列。
    隨着需求越來越復雜,如果沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。
    有沒有什么好的方法把這些步驟抽象一下呢,讓我們不關注這些細節,輕裝上陣呢?

答案是:有的,
從Python3.2開始一個叫做concurrent.futures被納入了標准庫;
而在Python2它屬於第三方的futures庫,需要手動安裝: pip install futures



The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be be performed by threads using ThreadPoolExecutor 
or seperate processes using ProcessPoolExecutor. Both implement the same interface, 
which is defined by the abstract Executor class.

 

1.進程池

  - 串行執行的情況:

import math,time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
def is_prime(n):
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
def main():
    for num in PRIMES:
        print('%d is prime: %s' % (num, is_prime(num)))

if __name__ == '__main__':
    start_time = time.time()
    main()
    end_time = time.time()
    print('Run time is %s' % (end_time-start_time))

---結果---
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Run time is 3.9570000171661377

  - 使用multiprocessing.Pool的情況:

import math,time
from multiprocessing import Pool

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
def is_prime(n):
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
def main():
    pool = Pool()
    res_l = []
    for prime in PRIMES:
        res = pool.apply_async(func=is_prime,args=(prime,))
        res_l.append(res)
    pool.close()
    pool.join()
    for number, prime in zip(PRIMES, res_l):
        print('%d is prime: %s' % (number, prime.get()))

if __name__ == '__main__':
    start_time = time.time()
    main()
    end_time = time.time()
    print('Run time is %s' % (end_time-start_time))

---結果---
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Run time is 2.687000036239624

  - 使用進程池 concurrent.futures.ProcessPoolExecutor的情況:

    - 參考:http://pythonhosted.org/futures/#concurrent.futures.ProcessPoolExecutor

ProcessPoolExecutor uses the multiprocessing module, 
which allows it to side-step the Global Interpreter Lock 
but also means that only picklable objects can be executed and returned.


class concurrent.futures.ProcessPoolExecutor(max_workers=None)
    Executes calls asynchronously using a pool of at most max_workers processes. 
    If max_workers is None or not given then as many worker processes will be created as the machine has processors.

    - ProcessPoolExecutor 本質上也是調用multiprocessing模塊

import math,time
from concurrent import futures

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
def is_prime(n):
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
def main():
    with futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    start_time = time.time()
    main()
    end_time = time.time()
    print('Run time is %s' % (end_time-start_time))

---結果---
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Run time is 2.482999801635742

 

2.線程池

  - 參考:http://pythonhosted.org/futures/#threadpoolexecutor-objects

The ThreadPoolExecutor class is an Executor subclass that uses a pool of threads to execute calls asynchronously.

class concurrent.futures.ThreadPoolExecutor(max_workers)
    Executes calls asynchronously using at pool of at most max_workers threads.

  - 串行執行的情況:

import urllib.request
import time

URLS = [
    'http://www.foxnews.com/',
    'https://www.stanford.edu/',
    'http://www.mit.edu/',
    'https://www.python.org/',
    'https://www.yahoo.com/',
    'http://www.ox.ac.uk/'
]

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

start_time = time.time()
for url in URLS:
    print('%r page is %d bytes' % (url, len(load_url(url,60))))
end_time = time.time()
print("Run time is %s" % (end_time-start_time))

---結果---
'http://www.foxnews.com/' page is 71131 bytes
'https://www.stanford.edu/' page is 68595 bytes
'http://www.mit.edu/' page is 21405 bytes
'https://www.python.org/' page is 47701 bytes
'https://www.yahoo.com/' page is 434510 bytes
'http://www.ox.ac.uk/' page is 93411 bytes
Run time is 5.068000078201294

  - 使用多線程的情況:

import urllib.request
import time
from threading import Thread

URLS = [
    'http://www.foxnews.com/',
    'https://www.stanford.edu/',
    'http://www.mit.edu/',
    'https://www.python.org/',
    'https://www.yahoo.com/',
    'http://www.ox.ac.uk/'
]
def load_url(url, timeout):
    res = urllib.request.urlopen(url, timeout=timeout).read()
    print('%r page is %d bytes' % (url, len(res)))
t_l = []
start_time = time.time()
for url in URLS:
    t = Thread(target=load_url,args=(url,60,))
    t_l.append(t)
    t.start()
for t in t_l:
    t.join()
end_time = time.time()
print("Run time is %s" % (end_time-start_time))

---結果---
'http://www.mit.edu/' page is 21403 bytes
'http://www.foxnews.com/' page is 71735 bytes
'https://www.python.org/' page is 47701 bytes
'https://www.stanford.edu/' page is 69130 bytes
'http://www.ox.ac.uk/' page is 93411 bytes
'https://www.yahoo.com/' page is 446715 bytes
Run time is 2.6540000438690186

  - 使用線程池 concurrent.futures.ThreadPoolExecutor的情況:

from concurrent import futures
import urllib.request
import time

URLS = [
    'http://www.foxnews.com/',
    'https://www.stanford.edu/',
    'http://www.mit.edu/',
    'https://www.python.org/',
    'https://www.yahoo.com/',
    'http://www.ox.ac.uk/'
]

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

start_time = time.time()
with futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = dict((executor.submit(load_url, url, 60), url) for url in URLS)
    for future in futures.as_completed(future_to_url):
        url = future_to_url[future]
        if future.exception() is not None:
            print('%r generated an exception: %s' % (url,future.exception()))
        else:
            print('%r page is %d bytes' % (url, len(future.result())))
end_time = time.time()
print("Run time is %s" % (end_time-start_time))

---結果---
'http://www.mit.edu/' page is 21405 bytes
'http://www.foxnews.com/' page is 71197 bytes
'https://www.python.org/' page is 47701 bytes
'http://www.ox.ac.uk/' page is 93411 bytes
'https://www.yahoo.com/' page is 444854 bytes
'https://www.stanford.edu/' page is 68595 bytes
Run time is 2.497999906539917

  

 

備注:由於網絡不穩定因素,所以Run time僅作為參考值;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM