分布式進程指的是將Process進程分布到多台機器上,充分利用多態機器的性能完成復雜的任務
-
分布式進程在python 中依然要用到multiprocessing 模塊。multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。可以寫一個服務進程作為調度者,將任務分布到其他多
個進程中,依靠網絡通信進行管理。例子:在做爬蟲程序時,抓取某個網站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址放到queue中,另外的進程負責從queue中取鏈接地址進行下載和存儲到本地。現在把這個過程做成分布式,一台機器上的進程負責抓取鏈接地址,其他機器上的進程負責系在存儲。那么遇到的主要問題是將queue 暴露到網絡中,讓其他機器進程都可以訪問,分布式進程就是將這個過程進行了封裝,我們可以將這個過程稱為本地隊列的網絡化 -
要實現上面例子的功能,創建分布式進程需要分為 六個步驟
- 建立隊列Queue ,用來進行進程間通信。服務進程創建任務隊列task_queue 用來作為傳遞任務給任務進程的通道;服務進程創建結果隊列result_queue ,作為任務進程完成任務后回復服務進程的通道。在分布式多進程環境下,必須由Queuemanager獲得Queue 接口來添加任務
- 把第一步中建立的隊列在網絡上注冊,暴露給其他進程(主機),注冊后獲得網絡隊列,相當於本地隊列的映像
- 建立一個對象(Queuemanager(BaseManager))實例manager,綁定端口和驗證口令
- 啟動第三步中建立的實例,即啟動管理manager,監管信息通道
- 通過管理實例的方法獲得通過網絡訪問的Queue對象,即再把網絡隊列實體化成可以使用的本地隊列
- 創建任務到 “本地”隊列中,自動上傳任務到網絡隊列中,分配給任務進程進行處理
接下來通過程序實現上面的列子(window版),首先編寫的是服務進程(taskManager.py)
#!coding:utf-8
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support, Queue
# 任務個數
task_number = 10
# 收發隊列
task_quue = Queue(task_number)
result_queue = Queue(task_number)
def get_task():
return task_quue
def get_result():
return result_queue
# 創建類似的queueManager
class QueueManager(BaseManager):
pass
def win_run():
# 注冊在網絡上,callable 關聯了Queue 對象
# 將Queue對象在網絡中暴露
#window下綁定調用接口不能直接使用lambda,所以只能先定義函數再綁定
QueueManager.register('get_task_queue', callable=get_task)
QueueManager.register('get_result_queue', callable=get_result)
# 綁定端口和設置驗證口令
manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode())
# 啟動管理,監聽信息通道
manager.start()
try:
# 通過網絡獲取任務隊列和結果隊列
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任務
for url in ["ImageUrl_" + str(i) for i in range(10)]:
print('url is %s' % url)
task.put(url)
print('try get result')
for i in range(10):
print('result is %s' % result.get(timeout=10))
except:
print 'Manager error'
finally:
manager.shutdown()
if __name__ == '__main__':
# window下多進程可能有問題,添加這句話緩解
freeze_support()
win_run()
服務進程已經編寫好,接下來任務進程(taskWorker.py)創建四步驟:
- 使用QueueManager注冊用於獲取Queue的方法名稱,任務進程只能通過名稱來網絡獲取Queue
- 連接服務器,端口和驗證口令注意保持與服務器進程中完全一致
- 從網絡獲取Queue,進行本地化
- 從task隊列獲取任務,並且把結果寫入result隊列
#coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 創建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 實現第一步:使用QueueManager注冊獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 實現第二步:連接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令注意保持與服務進程設置的完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接:
m.connect()
# 實現第三步:獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 實現第四步:從task隊列取任務,並把結果寫入result隊列:
while(not task.empty()):
image_url = task.get(True,timeout=5)
print('run task download %s...' % image_url)
time.sleep(1)
result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')
運行結果:
- taskManager.py
C:\Python27\python.exe F:/python_scrapy/python_study/taskManager.py
url is ImageUrl_0
url is ImageUrl_1
url is ImageUrl_2
url is ImageUrl_3
url is ImageUrl_4
url is ImageUrl_5
url is ImageUrl_6
url is ImageUrl_7
url is ImageUrl_8
url is ImageUrl_9
try get result
result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success
Process finished with exit code 0
- 任務進程(taskWorker.py)
C:\Python27\python.exe F:/python_scrapy/python_study/taskWorker.py
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.
Process finished with exit code 0