如果我們已經有一個通過Queue通信的多進程程序在同一台機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩台機器上。怎么用分布式進程實現?原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。
首先編寫個manager服務器
# encoding:utf-8
import random, time, Queue
from multiprocessing.managers import BaseManager
# 發送任務的隊列
task_queue = Queue.Queue()
# 接收結果的隊列
result_queue = Queue.Queue()
# 使用標准函數來代替lambda函數,避免python2.7中,pickle無法序列化lambda的問題
def get_task_queue():
global task_queue
return task_queue
# 使用標准函數來代替lambda函數,避免python2.7中,pickle無法序列化lambda的問題
def get_result_queue():
global task_queue
return task_queue
def startManager(host, port, authkey):
# 把兩個Queue都注冊到網絡上,callable參數關聯了Queue對象,注意回調函數不能使用括號
BaseManager.register('get_task_queue', callable=get_task_queue)
BaseManager.register('get_result_queue', callable=get_result_queue)
# 設置host,綁定端口port,設置驗證碼為authkey
manager = BaseManager(address=(host, port), authkey=authkey)
# 啟動manager服務器
manager.start()
return manager
def put_queue(manager):
# 通過網絡訪問queueu
task = manager.get_task_queue()
while 1:
n = random.randint(0, 1000)
print ('Put task %d' % n)
task.put(n)
time.sleep(0.5)
if __name__ == "__main__":
host = '127.0.0.1'
port = 5000
authkey = 'abc'
# 啟動manager服務器
manager = startManager(host, port, authkey)
# 給task隊列添加數據
put_queue(manager)
# 關閉服務器
manager.shutdown
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
然后編寫worker
# encoding:utf-8
import random, time, Queue
from multiprocessing.managers import BaseManager
def start_worker(host, port, authkey):
# 由於這個BaseManager只從網絡上獲取queue,所以注冊時只提供名字
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
print ('Connect to server %s' % host)
# 注意,端口port和驗證碼authkey必須和manager服務器設置的完全一致
worker = BaseManager(address=(host, port), authkey=authkey)
# 鏈接到manager服務器
worker.connect()
return worker
def get_queue(worker):
task = worker.get_task_queue()
result = worker.get_result_queue()
# 從task隊列取數據,並添加到result隊列中
while 1:
if task.empty():
time.sleep(1)
continue
n = task.get(timeout=1)
print ('worker get %d' % n)
result.put(n)
time.sleep(1)
if __name__ == "__main__":
host = '127.0.0.1'
port = 5000
authkey = 'abc'
# 啟動worker
worker = start_worker(host, port, authkey)
# 獲取隊列
get_queue(worker)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
最后,先啟動manager服務器,然后啟動兩個worker
manager服務器截圖
worker1截圖
worker2截圖
可以看到worker1+worker2的數據了等於manager服務器的數據,並且沒有重復的值
---------------------
作者:Imagine_Dragon
來源:CSDN
原文:https://blog.csdn.net/Imagine_Dragon/article/details/77689194
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!