並發編程之 multiprocessing 和 concurrent.futures(二)


1. multiprocessing

Python 實現多進程的模塊最常用的是multiprocessing,此外還有multiprocess、pathos、concurrent.futures、pp、parallel、pprocess等模塊。

1.1 multiprocessing.Process

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

參數

  • group: 為預留參數
  • target:子進程要執行的目標函數
  • name:線程名稱
  • args、kwargs:參數,args 必須是元組
  • deamonbool 值:表示是否為守護進程

實例

# coding=utf-8
import multiprocessing
import time


def run(a):
    time.sleep(5)
    print(a)
    return a * a


if __name__ == '__main__':
    p = multiprocessing.Process(target=run, args=(123456,))
    p.start()	# 運行進程實例
    p.join()    # 阻塞主進程,當子進程結束后,才會繼續執行主進程
    print(123)

1.2 multiprocessing.Pool

創建多個子進程最好是采用進程池 multiprocessing.Pool

multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)

參數

  • processes:進程數量,如果 processesNone那么使用 os.cpu_count()返回的數量
  • initializer: 如果 initializer不是 None,那么每一個工作進程在開始的時候會調用initializer(*initargs)
  • maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild 默認是None,意味着只要Pool存在工作進程就會一直存活
  • context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context

創建子進程的幾種方式

  • apply():同步阻塞執行,上一個子進程結束后才能進行下一個子進程(不推薦)
  • apply_async():異步非阻塞執行,每個子進程都是異步執行的(並行)(推薦)
  • map():同步阻塞
  • map_async():異步非阻塞
  • imap():內存不夠用可以采用此種方式,速度慢於 map()
  • imap_unorderedimap() 的無序版本(不會按照調用順序返回,而是按照結束順序返回),返回迭代器實例

1.2.1 apply

同步阻塞執行,上一個子進程結束后才能進行下一個子進程

apply(func, args=(), kwds={}, callback=None, error_callback=None) 

1.2.2 apply_async

異步非阻塞執行,每個子進程都是異步執行的(並行),異步執行指的是一批子進程並行執行,且子進程完成一個,就新開始一個,而不必等待同一批其他進程完成

# callback 回調,error_back 錯誤回調
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

示例

# coding=utf-8

import multiprocessing


def callback(result):
    """回調函數"""
    with open("result.txt", "a+", encoding="utf-8") as f:
        f.write(str(result) + "\n")


def run(num):
    return num * num


if __name__ == '__main__':
    pool = multiprocessing.Pool(6)
    for i in range(1000):
        pool.apply_async(run, args=(i,), callback=callback)
        
        # # 如有多個參數,可傳一個 iterable
        # pool.apply_async(run, args=([i, 123, 456]), callback=callback)

    pool.close()
    pool.join()

1.2.3 map

若子進程有返回值,且需集中處理,建議采用此種方式(但是它是同步阻塞的):

# iterable 可迭代類型,將 iterable 中每個元素作為參數應用到 func 函數中,返回 list
map(func, iterable, chunksize=None)

1.2.4 map_async

map 的異步非阻塞版本,返回 MapResult 實例,使用 get() 方法,獲取結果(list 方法):

map_async(func, iterable, chunksize=None, callback=None, error_callback=None)

apply_async 與 map_async 對比

# coding=utf-8

import multiprocessing
import time


def run(a):
    return a * a


data = []


def my_callback(result):
    data.append(result)


if __name__ == '__main__':
    st = time.time()
    pool = multiprocessing.Pool(6)

    # 總耗時:0.4497215747833252
    future = pool.map_async(run, range(20000))
    print(future.get())  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

    # # 總耗時:3.019148111343384
    # for i in range(20000):
    #     pool.apply_async(run, args=(i,), callback=my_callback)
    # 
    # print(data)

    pool.close()
    pool.join()
    print(f"總耗時:{time.time() - st}")

結論

  • map_asyncapply_async 速度快
  • 若想統一處理結果,map_asyncapply_async 更方便

1.2.5 imap 和 imap_unordered

內存不夠可以采用 imap 方式,map 的迭代器版本,返回迭代器實例,速度遠慢於 map,但是堆內存需求小。

imap_unorderedimap 的無序版本

imap(func, iterable, chunksize=1)
imap_unordered(func, iterable, chunksize=1)

實例:

# coding=utf-8

import multiprocessing
import time


def run(a):
    return a * a


data = []


def my_callback(result):
    data.append(result)


if __name__ == '__main__':
    st = time.time()
    pool = multiprocessing.Pool(6)

    # # 總耗時:0.4497215747833252
    # future = pool.map_async(run, range(20000))
    # print(future.get())  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

    # # 總耗時:3.019148111343384
    # for i in range(20000):
    #     pool.apply_async(run, args=(i,), callback=my_callback)
    #
    # print(data)

    future = pool.imap(run, range(20000))   # 總耗時:4.171960115432739
    print(future)
    for i in future:
        print(i)

    pool.close()
    pool.join()
    print(f"總耗時:{time.time() - st}")  # 總耗時:0.4497215747833252

1.2.6 starmap 和 starmap_async

starmap 可以使子進程活動接收多個參數,而 map 只能接收一個參數:

# 子進程活動 func允許包含多個參數,也即iterable的每個元素也是iterable(其每個元素作為func的參數),返回結果組成的 list
starmap(func, iterable, chunksize=None)

# 異步並行版本,返回 MapResult 實例,get() 方法可以獲取結果組成的 list
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)

# 使用方式
pool.starmap_async(f, ((a0, b0), (a1, b1), ...)).get()

1.3 進程間通信(數據共享)

每個進程是相互獨立的,內存無法共享,實現進程間數據共享的方式有:

  • multiprocessing.Value(typecode_or_type, *args, lock=True):共享單個數據,共享內存
  • multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):共享數組,共享內存
  • multiprocessing.Manager() :共享進程,支持多種數據結構的數據共享

Manager 支持的類型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,ValueArray不僅可以在本地進程間共享,甚至可以在多客戶端實現網絡共享,不過 Manager占用資源較大。

1、共享 dict

# coding=utf-8

# 多個進程將數據添加到字典 dd 中

import multiprocessing


def run(d, k, v):
    d[k] = v


if __name__ == '__main__':
    pool = multiprocessing.Pool(6)
    manager = multiprocessing.Manager()

    dd = manager.dict()

    for i in range(20):
        future = pool.apply_async(run, args=(dd, i, i * i))

    pool.close()
    pool.join()

    print(dict(dd))
    
# 運行結果
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 8: 64, 7: 49, 9: 81, 10: 100, 11: 121, 12: 144, 13: 169, 14: 196, 15: 225, 16: 256, 17: 289, 18: 324, 19: 361}

2、管理隊列,並讓不同的進程可以訪問它:

import multiprocessing


def worker(name, que):
    que.put("%d is done" % name)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()

    for i in range(20):
        pool.apply_async(worker, (i, q))

    pool.close()
    pool.join()
# coding=utf-8

import multiprocessing


def write(name, que):
    que.put("%d is done" % name)
    print(f'{name} write done!')


def read(que):
    while not que.empty():
        val = que.get(True)
        print('read===>: ', val)

        # while True:
        #     if not que.empty():
        #         val = que.get(True)
        #         print('read===>: ', val)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()

    for i in range(20):
        pool.apply_async(write, (i, q))

    p1 = multiprocessing.Process(target=read, args=(q,))
    p1.start()
    p1.join()

    pool.close()
    pool.join()

注意:在操作共享對象元素時,除了賦值操作,其他的方法都作用在共享對象的拷貝上,並不會對共享對象生效。比如:dic['k'] = []; dic['k'].append(x),將不會修改 dic 的內容

1.4 進程間通信(數據傳遞)

  • 隊列
    • multiprocessing.Queue(maxsize=0) :建立共享的隊列實例
    • multiprocessing.JoinableQueue(maxsize=0):建立可阻塞的隊列實例
  • 管道
    • multiprocessing.Pipe(duplex=True):建立一對管道對象,用於在兩個進程之間傳遞數據

參考文章:python並行計算(上):multiprocessing、multiprocess模塊

2. concurrent.futures 模塊

concurrent.futures3.2 中引入的新模塊,它為異步執行可調用對象提供了高層接口,分為兩類:

  • ThreadPoolExecutor:多線程編程
  • ProcessPoolExecutor:多進程編程

兩者實現了同樣的接口,這些接口由抽象類 Executor 定義;這個模塊提供了兩大類型:

  • Executor:執行器,用於管理工作池
  • Future:管理工作計算出的結果

2.1 concurrent.futures.Executor 類

提供了一系列方法,可以用於異步執行調用,定義的方法有:

# 調用對象執行,fn(*args, **kwargs),返回 Future 對象,可用 future.result() 獲取執行結果
submit(fn, *args, **kwargs)

# 異步執行 func,並支持多次並發調用,返回一個迭代器
# timeout 秒數可以是浮點數或者整數,如果設置為 None 或者不指定,則不限制等待時間
# ProcessPoolExecutor 這個方法將 iterables 划分為多塊,作為獨立的任務提交到進程池(不是 1)可顯著提升性能,ThreadPoolExecutor,chunksize 不起作用
map(func, *iterables, timeout=None, chunksize=1)

# 告訴當執行器 executor 在當前所有等待的 future 對象運行完畢后,應該釋放執行器用到的所有資源
# True 會等待所有 future 執行完畢,且 executor 的資源都釋放完會才會返回,False 會立即返回,executor 的資源會在 future 執行完后釋放
shutdown(wait=True)

2.2 ThreadPoolExecutor

ThreadPoolExecutor + requests 並發執行

# coding=utf-8

import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def fetch(req_url):
    r = requests.get(req_url)
    return r.json()['args']['a']


if __name__ == '__main__':
    start = time.time()
    numbers = range(12)
    url = 'http://httpbin.org/get?a={}'

    # submit() 方式
    with ThreadPoolExecutor(max_workers=3) as executor:
        # task_list = [executor.submit(fetch(url.format(n))) for n in range(12)]
        task_list = [executor.submit(fetch, url.format(n)) for n in range(12)]

        for future in as_completed(task_list):
            print(future.result())
            # data = future.result()        # 總耗時:2.903249740600586
            # print(data)

    ## map() 方式
    # with ThreadPoolExecutor(max_workers=3) as executor:
    #     future = executor.map(fetch, (url.format(n) for n in range(12)))
    #
    # for result in future:
    #     print(result)

    print(f'總耗時:{time.time() - start}')  # 總耗時:2.630300760269165

實測 submit 未按順序返回結果

2.3 ProcessPoolExecutor

ProcessPoolExecutor 使用進程池來異步執行調用,適合計算密集型任務,方法參數:

concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

示例:

# coding=utf-8


from concurrent.futures import ProcessPoolExecutor, as_completed


def fib(n):
    if n <= 2:
        return 1

    return fib(n - 1) + fib(n - 2)


if __name__ == '__main__':
    numbers = range(20)
    with ProcessPoolExecutor(max_workers=3) as executor:
        # # map 方式
        # for num, result in zip(numbers, executor.map(fib, numbers)):
        #     print(f"{num}====>{result}")

        # submit 方式
        work_dict = {executor.submit(fib, i): i for i in numbers}
        for future in as_completed(work_dict):
            num = work_dict[future]
            try:
                data = future.result()
            except Exception as e:
                print(e)
            else:
                print(f"fib({num} = {data})")

2.4 Future 類

Future 類封裝了可調用對象的異步執行,由 Executor.submit() 產生,有如下方法:

  • cancel() :嘗試取消調用,如果該調用正在執行中,無法取消,本方法返回 False,其他情況下調用會被取消,並返回 True;只有當任務提交了還沒執行才可以通過這種方式取消
  • cancelled(): 如果調用已經被成功取消,返回 True
  • running() :如果調用正在執行,無法被取消,則返回 True
  • done() :如果調用成功被取消或者已經執行完畢,返回 True
  • result(timeout=None): 返回調用的返回值。如果調用還沒有完成,則最多等待 timeout 秒。如果 timeout 秒之后還沒有完成,拋出 concurrent.futures.TimeoutError``。timeout 可以為整數或者浮點數。如果不指定或者為 None,則不限制等待時間。如果 future 在完成之前被取消了,會拋出 CancelledError 異常,如果調用拋出異常,這個方法會拋出同樣的異常。同時它也會阻塞直到任務完成,獲取被取消
  • exception(timeout=None) :返回被調用拋出的異常,如果調用還沒有執行完畢,則最多等待 timeout 秒。如果 timeout 秒之后還沒有完成,拋出 concurrent.futures.TimeoutErrortimeout 可以為整數或者浮點數。如果不指定或者為 None,則不限制等待時間。
    如果 future 在完成之前被取消了,會拋出 CancelledError 異常,如果調用完成並且沒有拋出異常,返回 None
  • add_done_callback(fn):為 future 附加可調用對象 fn,當 future 運行完畢或者被取消時,它會被用作 fn 的唯一參數,並調用 fn。可調用對象按照添加順序依次調用,並且總是在添加時所處進程的一個線程內調用它。如果該可調用對象拋出了屬於 Exception 子類的異常,它會被記錄並忽略。如果它拋出了屬於 BaseException 子類的異常,該行為未定義。
    如果 future 已經完成或者已經取消,fn 會被立即調用

通過 add_done_callack() 獲取返回值和捕獲異常

concurrent.futuresthread.ProcessPoolExecutor 線程池中的 worker 引發異常的時候,並不會直接向上拋起異常,而是需要主線程通過調用 concurrent.futures.Future.exception(timeout=None) 方法主動獲取 worker 的異常:

# coding=utf-8


from concurrent.futures import ProcessPoolExecutor, as_completed


def fib(n):
    if n <= 2:
        return 1

    return fib(n - 1) + fib(n - 2)


def call_back(future):
    """
    回調(可獲取多進程返回值、錯誤)
    :param future: future 對象
    :return:
    """
    # 獲取錯誤信息
    worker_exception = future.exception()
    if worker_exception:
        print(worker_exception)
    
    # 獲取返回值
    print(future.result())


def test(n):
    if n % 2 == 0:
        n / 0	# 發生異常

    return n * 2


if __name__ == '__main__':
    numbers = range(20)
    with ProcessPoolExecutor(max_workers=3) as executor:
        # # map 方式
        # for num, result in zip(numbers, executor.map(fib, numbers)):
        #     print(f"{num}====>{result}")

        # submit 方式
        # work_dict = {executor.submit(fib, i): i for i in numbers}
        # for future in as_completed(work_dict):
        #     num = work_dict[future]
        #     try:
        #         data = future.result()
        #     except Exception as e:
        #         print(e)
        #     else:
        #         print(f"fib({num} = {data})")

        # 其他方法
        for i in numbers:
            executor.submit(test, i).add_done_callback(call_back)

實測 map() 方式提交的會觸發異常,submit() 方式需要通過 add_done_callback() 主動捕獲異常!

參考文章

  • https://blog.csdn.net/jpch89/article/details/87643972
  • https://blog.csdn.net/makingLJ/article/details/98084973

3. 實例

3.1 多進程(池)向同一文件寫入數據

# coding=utf-8

"""
Function:回調函數解決多進程向同一文件寫入數據
"""
import multiprocessing


def callback(result):
    """回調函數"""
    with open("result.txt", "a+", encoding="utf-8") as f:
        f.write(str(result) + "\n")


def run(num):
    return num * num


if __name__ == '__main__':
    pool = multiprocessing.Pool(6)
    for i in range(1000):
        pool.apply_async(run, args=(i,), callback=callback)

    pool.close()
    pool.join()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM