本文代碼轉載廖雪峰老師的python3教程
分布式編程的難點在於:
1.服務器之間的通信,主節點如何了解從節點的執行進度,並在從節點之間進行負載均衡和任務調度;
2.如何讓多個服務器上的進程訪問同一資源的不同部分進行執行
第一部分涉及到網絡編程的底層細節
第二個問題讓我聯想到hdfs的一些功能。
首先分布式進程還是解決的是單機單進程無法處理的大數據量大計算量的問題,希望能加通過一份代碼(最多主+從兩份)來並行執行一個大任務。
這就面臨兩個問題,首先將程序分布到多台服務器,其次將輸入數據分配給多台服務器。
第一個問題相對比較簡單,畢竟程序一般不會太長,即便是超級jar包的spark程序,也不過百兆。
但數據里不同,如今企業級別的數據動輒GB、TB,如果在分布式程序執行之前首先要進行大容量數據的轉移,顯然是不可取的。
這時候我們就需要一個中央共享數據源,所有服務器都可以對這個數據源進行並行存取(塊block),這就已經非常接近hdfs的功能。
因為在hdfs中,集群中的多台服務器共享同一個hdfs,每台機器訪問hdfs就像訪問本地數據一樣(還是稍微慢一點);
計算任務執行完之后,每台服務器還可以將自己的計算結果寫回hdfs,每台服務器的結果被存儲成了結果目錄中的小文件。
# task_master.py import random, time, queue from multiprocessing.managers import BaseManager # 發送任務的隊列: task_queue = queue.Queue() # 接收結果的隊列: result_queue = queue.Queue() # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 綁定端口5000, 設置驗證碼'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網絡訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result隊列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 創建類似的QueueManager: class QueueManager(BaseManager): pass # 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 連接到服務器,也就是運行task_master.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證碼注意保持與task_master.py設置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 從網絡連接: m.connect() # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊列取任務,並把結果寫入result隊列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 處理結束: print('worker exit.')