基於ray的分布式機器學習(一)


基本思路:
1、對數據分塊,使用多個worker分別處理一個數據塊,每個worker暴露兩個接口,分別是損失計算的接口loss和梯度計算的接口grad;
2、同時定義full_loss和full_grad接口對每個worker的loss和grad進行聚合;
3、使用bfgs算法進行參數優化,分別使用full_loss和full_grad作為bfgs的損失函數和梯度函數,即可進行網絡參數優化;
注意:在此實現中,每個worker內部每次均計算一個數據塊上的損失和梯度,而非一個batch

#0、導入依賴 import numpy as np import os import scipy.optimize import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import ray import ray.experimental.tf_utils #1、定義模型 class LinearModel(object): def __init__(self, shape): """Creates a LinearModel object.""" x = tf.placeholder(tf.float32, [None, shape[0]]) w = tf.Variable(tf.zeros(shape)) b = tf.Variable(tf.zeros(shape[1])) self.x = x self.w = w self.b = b y = tf.nn.softmax(tf.matmul(x, w) + b) y_ = tf.placeholder(tf.float32, [None, shape[1]]) self.y_ = y_ cross_entropy = tf.reduce_mean( -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) self.cross_entropy = cross_entropy self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b]) self.sess = tf.Session() self.variables = ray.experimental.tf_utils.TensorFlowVariables( cross_entropy, self.sess) def loss(self, xs, ys): """計算loss""" return float( self.sess.run( self.cross_entropy, feed_dict={ self.x: xs, self.y_: ys })) def grad(self, xs, ys): """計算梯度""" return self.sess.run( self.cross_entropy_grads, feed_dict={ self.x: xs, self.y_: ys }) #2、定義遠程worker,用於計算模型loss、grads @ray.remote class NetActor(object): def __init__(self, xs, ys): os.environ["CUDA_VISIBLE_DEVICES"] = "" with tf.device("/cpu:0"): self.net = LinearModel([784, 10]) self.xs = xs self.ys = ys # 計算一個數據塊的loss def loss(self, theta): net = self.net net.variables.set_flat(theta) return net.loss(self.xs, self.ys) # 計算一個數據塊的梯度 def grad(self, theta): net = self.net net.variables.set_flat(theta) gradients = net.grad(self.xs, self.ys) return np.concatenate([g.flatten() for g in gradients]) def get_flat_size(self): return self.net.variables.get_flat_size() #3、獲取遠程worker損失的函數 def full_loss(theta): theta_id = ray.put(theta) loss_ids = [actor.loss.remote(theta_id) for actor in actors] return sum(ray.get(loss_ids)) #4、獲取遠程worker梯度的函數 def full_grad(theta): theta_id = ray.put(theta) grad_ids = [actor.grad.remote(theta_id) for actor in actors] # 使用fmin_l_bfgs_b須轉換為float64數據類型 return sum(ray.get(grad_ids)).astype("float64") #5、使用lbfgs進行訓練 if __name__ == "__main__": ray.init() mnist = input_data.read_data_sets("MNIST_data", one_hot=True)   # 數據分塊,每個worker跑一個數據塊 num_batches = 10 batch_size = mnist.train.num_examples // num_batches batches = [mnist.train.next_batch(batch_size) for _ in range(num_batches)] actors = [NetActor.remote(xs, ys) for (xs, ys) in batches]   # 參數初始化 dim = ray.get(actors[0].get_flat_size.remote()) theta_init = 1e-2 * np.random.normal(size=dim)   # 優化 result = scipy.optimize.fmin_l_bfgs_b( full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True)   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM