Python: 多進程的分布式進程multiprocessing.managers


multiprocessing.managers

 

在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分布到多台機器上,而Thread最多只能分布到同一台機器的多個CPU上。

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。

 

 

Server process

Manager()返回一個manager對象。它控制一個服務器進程,這個進程會管理Python對象並允許其他進程通過代理的方式來操作這些對象。

manager對象支持多種類型。例子見下:

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = "1"
    d["2"] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()       #產生一個代理對象d
        l = manager.list(range(10))

        p = Process(target=f, args=(d,l))
        p.start()
        p.join()

        print(d)
        print(l)

解釋:

with語句:見這篇文章

with 語句是從 Python 2.5 開始引入的一種與異常處理相關的功能(2.5 版本中要通過 from __future__ import with_statement 導入后才可以使用),從 2.6 版本開始缺省可用(參考 What's new in Python 2.6? 中 with 語句相關部分介紹)。

with 語句適用於對資源進行訪問的場合,確保不管使用過程中是否發生異常都會執行必要的“清理”操作,釋放資源,比如文件使用后自動關閉、線程中鎖的自動獲取acquire和release等。

⚠️,with語句的實現類似try..finally。

 

代理對象:

  • 指向其他共享對象的對象。
  • 共享對象也可以說是代理 指涉 的對象。
  • 多個代理對象可能指向同一個指涉對象。

代理對象代理了指涉對象的一系列方法調用(雖然並不是指涉對象的每個方法都有必要被代理)。通過這種方式,代理的使用方法可以和它的指涉對象一樣:

>>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]

上面使用了list(), dict()方法

 

管理器的特點:

服務器進程管理器比使用共享內存對象更靈活,它們支持二進制對象類型。

同時,一個單獨的manager可以被網絡上的不同計算機的進程共享。

缺點是比使用shared memory慢。

 

使用manager對象可以創建一個共享queue。具體見下一章節:


 

 

Managers

 

Managers提供了創建一種數據的方法,這個數據可以被不同的進程共享。這種共享也包括通過網絡在不同計算機的進程上共享。

 

multiprocessing.Manager()

返回一個已啟動的SyncManager對象(BaseManager的子類的實例對象),用於在進程之間分享數據。

SyncManager對象(點擊查看方法)對應一個已經啟動的子進程,它擁有一系列方法,可以為大部分常用數據類型創建並返回 代理對象 代理,用於進程間同步。甚至包括共享列表和字典。(👆的代碼例子)

 

當管理器被垃圾回收或者父進程退出時,管理器進程會立即退出。

 

class multiprocessing.managers.BaseManager([address[, authkey]])

創建一個BaseManager對象。創建后,需要調用start()或get_server().server_forever()確保對象對於的管理器進程已經啟動。

  • address參數,管理器服務進程監聽的地址。如果值是None,則任意主機的請求都能建立連接。
  • authkey參數,byte類的字符串。認真標識(驗證碼)

 start(), 為管理器開啟子進程。

 get_server(),返回一個Server對象。

 connect(), 連接本地管理器對象到一個遠程管理器進程

 shutdown() 停止管理器的進程。配合start()。

 register(typid, callable)⚠️最重要的類方法,凡是注冊到管理器的類型/對象,就可以被網絡上的不同進程共享了。

  


 

例子

下面是一個簡單的Master/Worker模型,實現一個簡單的分布計算。如果要啟動多個worker,就可以把任務分配到多台機器上了,

比如把計算n*n的代碼替換成發送郵件,就實現了郵件隊列的異步發送。

 

通過manager模塊的支持,多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中。

 

⚠️

注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。

比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。

 

案例代碼 參考了https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600#0,但這個代碼不適合python3.8版本的了。

會報告2個錯誤。

第一個❌

#_pickle.PicklingError: Can't pickle <function <lambda> at 0x107ef8670>: attribute lookup <lambda> on __main__ failed

網上查了一下,https://github.com/scikit-learn/scikit-learn/issues/9467這篇文章指出pickle模塊不指出lambda函數。

看文檔,https://docs.python.org/zh-cn/3/library/pickle.html ,被封存對象不能是lambda函數返回的對象。只能是def定義返回的對象。

第二個❌

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

看文檔https://docs.python.org/zh-cn/3/library/multiprocessing.html, 3.8版本增加了freeze_support()函數。主要是為了支持windows可執行文件。畢竟multiprocessing可用於分布式進程。

所以必須引入freeze_support:

看代碼:

服務器上的代碼:

import random, time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 建立2個隊列,一個發送,一個接收
task_queue = queue.Queue()
result_queue = queue.Queue()

def get_task():
    return task_queue

def get_result():
    return result_queue

class QueueManager(BaseManager): pass
# 服務器的管理器上注冊2個共享隊列
QueueManager.register('get_task', callable=get_task)
QueueManager.register('get_result', callable=get_result)
# 設置端口,地址默認為空。驗證碼authkey需要設定。
manager = QueueManager(address=('', 5000), authkey=b'abc')

def manager_run():
    manager.start()
    # 通過管理器訪問共享隊列。
    task = manager.get_task()
    result = manager.get_result()

    #對隊列進行操作, 往task隊列放進任務。
    for value in range(10):
        n = random.randint(0,100)
        print('Put task %d' % n)
        task.put(n)
    # 從result隊列取出結果
    print('Try get result...')
    try:
        for value in range(10):
            r = result.get(timeout=10)
            print('Result: %s' % r)
    except queue.Empty:
        print('result is empty')
    # 關閉管理器。
    manager.shutdown()
    print('master exit.')

if __name__ == '__main__':
    freeze_support()
    manager_run()

 

另一台機器(或本機啟動也可以):

import time, sys, queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): pass

# 從網絡上的服務器上獲取Queue,所以注冊時只提供服務器上管理器注冊的隊列的名字:
QueueManager.register('get_task')
QueueManager.register('get_result')

server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# b'abc'相當於'abc'.encode('ascii'),類型是bytes
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 連接服務器
m.connect()
# 獲得服務器上的隊列對象
task = m.get_task()
result = m.get_result()

for value in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n , n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('task queue is empty')

print('worker exit.')

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM