先來個最簡單的例子:
把1-10000每個數求平方
服務器server:
用兩個隊列存儲任務、結果
定義兩個函數
要實現分布式得繼承multiprocessing.managers.BaseManager
在主函數里multiprocessing.freeze_support()開啟分布式支持
注冊兩個函數給客戶端調用
創建管理器,設置ip地址和開啟端口、鏈接密碼。
用兩個隊列加任務、收結果。用剛剛注冊的函數
把1-10000壓入隊列,
把結果壓入隊列
最后完成關閉服務器
客戶端client:
也需要繼承multiprocessing.managers.BaseManager
定義一個協程處理一個數據,同時把結果壓入結果隊列
定義一個線程處理10個數據,開啟10個協程
定義一個進程,進程驅動10個線程
主函數:同客戶端注冊兩個函數
同客戶端創建管理器,設置ip地址和開啟端口、鏈接密碼。
鏈接服務器
同客戶端調用注冊的函數,兩個隊列
套四層循環:10個進程、100個線程、1000個協程
循環進程函數
上代碼:
服務器server:
#coding:utf-8 import multiprocessing #分布式進程 import multiprocessing.managers #分布式進程管理器 import random,time #隨機數,時間 import Queue #隊列 task_queue=Queue.Queue() #任務 result_queue=Queue.Queue() #結果 def return_task(): #返回任務隊列 return task_queue def return_result(): #返回結果隊列 return result_queue class QueueManger(multiprocessing.managers.BaseManager):#繼承,進程管理共享數據 pass if __name__=="__main__": multiprocessing.freeze_support()#開啟分布式支持 QueueManger.register("get_task",callable=return_task)#注冊函數給客戶端調用 QueueManger.register("get_result", callable=return_result) manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #創建一個管理器,設置地址與密碼 manger.start() #開啟 task,result=manger.get_task(),manger.get_result() #任務,結果 for i in range(10000): print "task add data",i task.put(i) print "waitting for------" for i in range(10000): res=result.get(timeout=100) print "get data",res manger.shutdown()#關閉服務器
客戶端client:
#coding:utf-8 import multiprocessing #分布式進程 import multiprocessing.managers # 分布式進程管理器 import random,time #隨機數,時間 import Queue #隊列 import threading import gevent import gevent.monkey class QueueManger(multiprocessing.managers.BaseManager):# 繼承,進程管理共享數據 pass def gevetygo(num ,result): #協程處理一個數據 print num*num result.put(num*num) def threadgo(datalist,result): # 線程處理10個數據,開啟10個協程 tasklist=[] for data in datalist: tasklist.append(gevent.spawn(gevetygo, data,result)) gevent.joinall(tasklist) def processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 進程驅動了10個線程 threadlist=[] for datalist in ddatalist: mythread=threading.Thread(target=threadgo,args=(datalist,result)) mythread.start() threadlist.append(mythread) for mythread in threadlist: mythread.join() if __name__=="__main__": QueueManger.register("get_task") # 注冊函數調用服務器 QueueManger.register("get_result") manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") manger.connect() # 鏈接服務器 task= manger.get_task() result =manger.get_result() # 任務,結果 # 1000 # 10個進程 # 100個線程 # 1000個協程 for i in range(10): cubelist = [] # [[[1],[2]]] for j in range(10): arealist = [] for k in range(10): linelist = [] for l in range(10): data = task.get() linelist.append(data) arealist.append(linelist) cubelist.append(arealist) processlist = [] for myarealist in cubelist: process = multiprocessing.Process(target=processgo, args=(myarealist, result)) process.start() processlist.append(process) for process in processlist: process.join()
遇到的坑:一個月之前弄分布式的時候寫ip地址怎么都開啟不了,后來換了台電腦就支持了= =。
如果只是在自己電腦上弄的話,寫127.0.0.1也可以運行,如果你也遇到ip地址怎么都開啟不了的情況