python 分布式進程


分布式進程指的是將Process進程分布到多台機器上,充分利用多態機器的性能完成復雜的任務

  • 分布式進程在python 中依然要用到multiprocessing 模塊。multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。可以寫一個服務進程作為調度者,將任務分布到其他多
    個進程中,依靠網絡通信進行管理。例子:在做爬蟲程序時,抓取某個網站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址放到queue中,另外的進程負責從queue中取鏈接地址進行下載和存儲到本地。現在把這個過程做成分布式,一台機器上的進程負責抓取鏈接地址,其他機器上的進程負責系在存儲。那么遇到的主要問題是將queue 暴露到網絡中,讓其他機器進程都可以訪問,分布式進程就是將這個過程進行了封裝,我們可以將這個過程稱為本地隊列的網絡化

要實現上面例子的功能,創建分布式進程需要分為 六個步驟

  • 建立隊列Queue ,用來進行進程間通信。服務進程創建任務隊列task_queue 用來作為傳遞任務給任務進程的通道;服務進程創建結果隊列result_queue ,作為任務進程完成任務后回復服務進程的通道。在分布式多進程環境下,必須由Queuemanager獲得Queue 接口來添加任務
  • 把第一步中建立的隊列在網絡上注冊,暴露給其他進程(主機),注冊后獲得網絡隊列,相當於本地隊列的映像
  • 建立一個對象(Queuemanager(BaseManager))實例manager,綁定端口和驗證口令
  • 啟動第三步中建立的實例,即啟動管理manager,監管信息通道
  • 通過管理實例的方法獲得通過網絡訪問的Queue對象,即再把網絡隊列實體化成可以使用的本地隊列
  • 創建任務到 “本地”隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理

接下來通過程序實現上面的列子(window版),首先編寫的是服務進程(taskManager.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random, time, queue
from multiprocessing.managers import BaseManager

# 發送任務的隊列:
task_queue = queue.Queue()
# 接收結果的隊列:
result_queue = queue.Queue()


# 自定義函數re_task_queue
def re_task_queue():
    global task_queue
    return task_queue


# 自定義函數re_result_queue
def re_result_queue():
    global result_queue
    return result_queue


# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass


if __name__ == '__main__':

    # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
    QueueManager.register('get_task_queue', callable=re_task_queue)
    QueueManager.register('get_result_queue', callable=re_result_queue)

    # 綁定端口5000, 設置驗證碼'abc':
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

    # 啟動Queue:
    manager.start()

    # 獲得通過網絡訪問的Queue對象:
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # 放幾個任務進去:
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d...' % n)
        task.put(n)

    # 從result隊列讀取結果:
    print('Try get results...')
    for i in range(10):
        r = result.get(True)  #這樣的話會一直夯築,等待結果隊列有結果
        # r = result.get(timeout=10)   #而這樣等待10秒中結果隊列如果還沒有數據進來就直接報錯了
        print('Result: %s' % r)

    # 關閉:
    manager.shutdown()
    print('master exit.')

服務進程已經編寫好,接下來任務進程(taskWorker.py)創建四步驟:

  • 使用QueueManager注冊用於獲取Queue的方法名稱,任務進程只能通過名稱來網絡獲取Queue
  • 連接服務器,端口和驗證口令注意保持與服務器進程中完全一致
  • 從網絡獲取Queue,進行本地化
  • 從task隊列獲取任務,並且把結果寫入result隊列
import time, sys, queue
from multiprocessing.managers import BaseManager

# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連接到服務器,也就是運行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與task_master.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except result.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM