分布式進程指的是將Process進程分布到多台機器上,充分利用多態機器的性能完成復雜的任務
-
分布式進程在python 中依然要用到multiprocessing 模塊。multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。可以寫一個服務進程作為調度者,將任務分布到其他多
個進程中,依靠網絡通信進行管理。例子:在做爬蟲程序時,抓取某個網站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址放到queue中,另外的進程負責從queue中取鏈接地址進行下載和存儲到本地。現在把這個過程做成分布式,一台機器上的進程負責抓取鏈接地址,其他機器上的進程負責系在存儲。那么遇到的主要問題是將queue 暴露到網絡中,讓其他機器進程都可以訪問,分布式進程就是將這個過程進行了封裝,我們可以將這個過程稱為本地隊列的網絡化 -
要實現上面例子的功能,創建分布式進程需要分為 六個步驟
- 建立隊列Queue ,用來進行進程間通信。服務進程創建任務隊列task_queue 用來作為傳遞任務給任務進程的通道;服務進程創建結果隊列result_queue ,作為任務進程完成任務后回復服務進程的通道。在分布式多進程環境下,必須由Queuemanager獲得Queue 接口來添加任務
- 把第一步中建立的隊列在網絡上注冊,暴露給其他進程(主機),注冊后獲得網絡隊列,相當於本地隊列的映像
- 建立一個對象(Queuemanager(BaseManager))實例manager,綁定端口和驗證口令
- 啟動第三步中建立的實例,即啟動管理manager,監管信息通道
- 通過管理實例的方法獲得通過網絡訪問的Queue對象,即再把網絡隊列實體化成可以使用的本地隊列
- 創建任務到 “本地”隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理
接下來通過程序實現上面的列子(window版),首先編寫的是服務進程(taskManager.py)
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import random, time, queue from multiprocessing.managers import BaseManager # 發送任務的隊列: task_queue = queue.Queue() # 接收結果的隊列: result_queue = queue.Queue() # 自定義函數re_task_queue def re_task_queue(): global task_queue return task_queue # 自定義函數re_result_queue def re_result_queue(): global result_queue return result_queue # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass if __name__ == '__main__': # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象: QueueManager.register('get_task_queue', callable=re_task_queue) QueueManager.register('get_result_queue', callable=re_result_queue) # 綁定端口5000, 設置驗證碼'abc': manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網絡訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result隊列讀取結果: print('Try get results...') for i in range(10): r = result.get(True) #這樣的話會一直夯築,等待結果隊列有結果 # r = result.get(timeout=10) #而這樣等待10秒中結果隊列如果還沒有數據進來就直接報錯了 print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')
服務進程已經編寫好,接下來任務進程(taskWorker.py)創建四步驟:
- 使用QueueManager注冊用於獲取Queue的方法名稱,任務進程只能通過名稱來網絡獲取Queue
- 連接服務器,端口和驗證口令注意保持與服務器進程中完全一致
- 從網絡獲取Queue,進行本地化
- 從task隊列獲取任務,並且把結果寫入result隊列
import time, sys, queue from multiprocessing.managers import BaseManager # 創建類似的QueueManager: class QueueManager(BaseManager): pass # 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 連接到服務器,也就是運行task_master.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證碼注意保持與task_master.py設置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 從網絡連接: m.connect() # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊列取任務,並把結果寫入result隊列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except result.Empty: print('task queue is empty.') # 處理結束: print('worker exit.')