基本思路:
1、對數據分塊,使用多個worker分別處理一個數據塊,每個worker暴露兩個接口,分別是損失計算的接口loss和梯度計算的接口grad;
2、同時定義full_loss和full_grad接口對每個worker的loss和grad進行聚合;
3、使用bfgs算法進行參數優化,分別使用full_loss和full_grad作為bfgs的損失函數和梯度函數,即可進行網絡參數優化;
注意:在此實現中,每個worker內部每次均計算一個數據塊上的損失和梯度,而非一個batch。
#0、導入依賴
import numpy as np
import os
import scipy.optimize
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import ray
import ray.experimental.tf_utils
#1、定義模型
class LinearModel(object):
def __init__(self, shape):
"""Creates a LinearModel object."""
x = tf.placeholder(tf.float32, [None, shape[0]])
w = tf.Variable(tf.zeros(shape))
b = tf.Variable(tf.zeros(shape[1]))
self.x = x
self.w = w
self.b = b
y = tf.nn.softmax(tf.matmul(x, w) + b)
y_ = tf.placeholder(tf.float32, [None, shape[1]])
self.y_ = y_
cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
self.cross_entropy = cross_entropy
self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b])
self.sess = tf.Session()
self.variables = ray.experimental.tf_utils.TensorFlowVariables(
cross_entropy, self.sess)
def loss(self, xs, ys):
"""計算loss"""
return float(
self.sess.run(
self.cross_entropy, feed_dict={
self.x: xs,
self.y_: ys
}))
def grad(self, xs, ys):
"""計算梯度"""
return self.sess.run(
self.cross_entropy_grads, feed_dict={
self.x: xs,
self.y_: ys
})
#2、定義遠程worker,用於計算模型loss、grads
@ray.remote
class NetActor(object):
def __init__(self, xs, ys):
os.environ["CUDA_VISIBLE_DEVICES"] = ""
with tf.device("/cpu:0"):
self.net = LinearModel([784, 10])
self.xs = xs
self.ys = ys
# 計算一個數據塊的loss
def loss(self, theta):
net = self.net
net.variables.set_flat(theta)
return net.loss(self.xs, self.ys)
# 計算一個數據塊的梯度
def grad(self, theta):
net = self.net
net.variables.set_flat(theta)
gradients = net.grad(self.xs, self.ys)
return np.concatenate([g.flatten() for g in gradients])
def get_flat_size(self):
return self.net.variables.get_flat_size()
#3、獲取遠程worker損失的函數
def full_loss(theta):
theta_id = ray.put(theta)
loss_ids = [actor.loss.remote(theta_id) for actor in actors]
return sum(ray.get(loss_ids))
#4、獲取遠程worker梯度的函數
def full_grad(theta):
theta_id = ray.put(theta)
grad_ids = [actor.grad.remote(theta_id) for actor in actors]
# 使用fmin_l_bfgs_b須轉換為float64數據類型
return sum(ray.get(grad_ids)).astype("float64")
#5、使用lbfgs進行訓練
if __name__ == "__main__":
ray.init()
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)
# 數據分塊,每個worker跑一個數據塊
num_batches = 10
batch_size = mnist.train.num_examples // num_batches
batches = [mnist.train.next_batch(batch_size) for _ in range(num_batches)]
actors = [NetActor.remote(xs, ys) for (xs, ys) in batches]
# 參數初始化
dim = ray.get(actors[0].get_flat_size.remote())
theta_init = 1e-2 * np.random.normal(size=dim)
# 優化
result = scipy.optimize.fmin_l_bfgs_b(
full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True)