一、概述思路
假設一台機器上有個GPU。給定需要訓練的模型,每個GPU將分別獨立維護一份完整的模型參數。
在模型訓練的任意一次迭代中,給定一個小批量,我們將該批量中的樣本划分成份並分給每個GPU一份。
然后,每個GPU將分別根據自己分到的訓練數據樣本和自己維護的模型參數計算模型參數的梯度。
接下來,我們把k個GPU上分別計算得到的梯度相加,從而得到當前的小批量梯度。
之后,每個GPU都使用這個小批量梯度分別更新自己維護的那一份完整的模型參數。
二、網絡以及輔助函數
使用“卷積神經網絡——從零開始”里的LeNet來作為本節的樣例:
# 初始化模型參數。 scale = 0.01 W1 = nd.random.normal(scale=scale, shape=(20, 1, 3, 3)) b1 = nd.zeros(shape=20) W2 = nd.random.normal(scale=scale, shape=(50, 20, 5, 5)) b2 = nd.zeros(shape=50) W3 = nd.random.normal(scale=scale, shape=(800, 128)) b3 = nd.zeros(shape=128) W4 = nd.random.normal(scale=scale, shape=(128, 10)) b4 = nd.zeros(shape=10) params = [W1, b1, W2, b2, W3, b3, W4, b4] # 定義模型。 def lenet(X, params): h1_conv = nd.Convolution(data=X, weight=params[0], bias=params[1], kernel=(3, 3), num_filter=20) h1_activation = nd.relu(h1_conv) h1 = nd.Pooling(data=h1_activation, pool_type="avg", kernel=(2, 2), stride=(2, 2)) h2_conv = nd.Convolution(data=h1, weight=params[2], bias=params[3], kernel=(5, 5), num_filter=50) h2_activation = nd.relu(h2_conv) h2 = nd.Pooling(data=h2_activation, pool_type="avg", kernel=(2, 2), stride=(2, 2)) h2 = nd.flatten(h2) h3_linear = nd.dot(h2, params[4]) + params[5] h3 = nd.relu(h3_linear) y_hat = nd.dot(h3, params[6]) + params[7] return y_hat # 交叉熵損失函數。 loss = gloss.SoftmaxCrossEntropyLoss()
參數列表復制到指定設備
下面函數將模型參數[參數一,參數二,……]復制到某個特定GPU,並標記梯度求解:
def get_params(params, ctx): new_params = [p.copyto(ctx) for p in params] for p in new_params: p.attach_grad() return new_params
同一參數設備間同步
以下函數可以把各個GPU上的同一參數數據加起來,然后再廣播到所有GPU上:
def allreduce(data): # 輸入為list,包含位於不同設備上的同一參數 for i in range(1, len(data)): data[0][:] += data[i].copyto(data[0].context) # 將i位復制到0位設備上,並加給0位 for i in range(1, len(data)): data[0].copyto(data[i]) # 使用累計后的0位替換i位
數據划分到設備
給定一個批量的數據樣本,以下函數可以划分它們並復制到各個GPU上:
def split_and_load(data, ctx): n, k = data.shape[0], len(ctx) m = n // k assert m * k == n, '# examples is not divided by # devices.' return [data[i * m: (i + 1) * m].as_in_context(ctx[i]) for i in range(k)]
三、訓練過程
將完整的模型參數復制到多個GPU上,並在每次迭代時對單個小批量上進行多GPU訓練:
def train(num_gpus, batch_size, lr): train_iter, test_iter = gb.load_data_fashion_mnist(batch_size) ctx = [mx.gpu(i) for i in range(num_gpus)] # 設備代號list print('running on:', ctx) # 將模型參數復制到 num_gpus 個 GPU 上。 gpu_params = [get_params(params, c) for c in ctx] # 每個元素為一個設備上的參數 for epoch in range(1, 6): start = time() for X, y in train_iter: # 對單個小批量上進行多 GPU 訓練。 train_batch(X, y, gpu_params, ctx, lr) nd.waitall() print('epoch %d, time: %.1f sec' % (epoch, time() - start)) # 在 GPU0 上驗證模型。 net = lambda x: lenet(x, gpu_params[0]) test_acc = gb.evaluate_accuracy(test_iter, net, ctx[0]) print('validation accuracy: %.4f' % test_acc)
實現單個小批量上的多GPU訓練:
def train_batch(X, y, gpu_params, ctx, lr): # 划分小批量數據樣本並復制到各個 GPU 上。 gpu_Xs = split_and_load(X, ctx) gpu_ys = split_and_load(y, ctx) # 在各個 GPU 上計算損失。 with autograd.record(): ls = [loss(lenet(gpu_X, gpu_W), gpu_y) # 不同設備上的loss對象 for gpu_X, gpu_y, gpu_W in zip(gpu_Xs, gpu_ys, gpu_params)] # 在各個 GPU 上反向傳播。 for l in ls: l.backward() # 把各個 GPU 上的梯度加起來,然后再廣播到所有 GPU 上。 for i in range(len(gpu_params[0])): # gpu_params[0]:位於設備0上的全部參數 allreduce([gpu_params[c][i].grad for c in range(len(ctx))]) # 匯總梯度並廣播 # 在各個 GPU 上更新自己維護的那一份完整的模型參數。 for param in gpu_params: # 各個設備分別更新 gb.sgd(param, lr, X.shape[0])
四、Gluon實現
模型參數初始化於各個設備
前我們介紹了如何使用initialize
函數的ctx
參數在CPU或單個GPU上初始化模型參數。事實上,ctx
可以接受一系列的CPU/GPU,從而使初始化好的模型參數復制到ctx
里所有的CPU/GPU上。
ctx = [mx.gpu(0), mx.gpu(1)] net.initialize(init=init.Normal(sigma=0.01), ctx=ctx)
此時的net對象對應一系列相同結構不同設備的實體。
數據分發到各個設備
Gluon提供了上一節中實現的split_and_load
函數。它可以划分一個小批量的數據樣本並復制到各個CPU/GPU上。之后,根據輸入數據所在的CPU/GPU,模型計算會發生在相同的CPU/GPU上。
x = nd.random.uniform(shape=(4, 1, 28, 28)) gpu_x = gutils.split_and_load(x, ctx) net(gpu_x[0]), net(gpu_x[1])
默認下weight.data()
會返回CPU上的參數值。由於我們指定了2個GPU來初始化模型參數,我們需要指定GPU訪問。我們看到,相同參數在不同的GPU上的值一樣。
weight = net[1].params.get('weight') try: weight.data() except: print('not initialized on', mx.cpu()) weight.data(ctx[0])[0], weight.data(ctx[1])[0]
not initialized on cpu(0)( [[[-0.01473444 -0.01073093 -0.01042483] [-0.01327885 -0.01474966 -0.00524142] [ 0.01266256 0.00895064 -0.00601594]]] <NDArray 1x3x3 @gpu(0)>, [[[-0.01473444 -0.01073093 -0.01042483] [-0.01327885 -0.01474966 -0.00524142] [ 0.01266256 0.00895064 -0.00601594]]] <NDArray 1x3x3 @gpu(1)>)
參數同步
當我們使用多個GPU來訓練模型時,gluon.Trainer
會自動做數據並行,例如划分小批量數據樣本並復制到各個GPU上,對各個GPU上的梯度求和再廣播到所有GPU上。這樣,我們就可以很方便地實現訓練函數了。
另外net.collect_params().reset_ctx()可以重置設備
loss = gloss.SoftmaxCrossEntropyLoss() def train(num_gpus, batch_size, lr): train_iter, test_iter = gb.load_data_fashion_mnist(batch_size) ctx = [mx.gpu(i) for i in range(num_gpus)] print('running on:', ctx) net.initialize(init=init.Normal(sigma=0.01), ctx=ctx, force_reinit=True) # 網絡初始化於各個設備 trainer = gluon.Trainer( net.collect_params(), 'sgd', {'learning_rate': lr}) # 優化器會自動識別net對象的設備列表 for epoch in range(1, 6): start = time() for X, y in train_iter: gpu_Xs = gutils.split_and_load(X, ctx) # 數據划分到各個設備 gpu_ys = gutils.split_and_load(y, ctx) with autograd.record(): ls = [loss(net(gpu_X), gpu_y) for gpu_X, gpu_y in zip( gpu_Xs, gpu_ys)] # 記錄各個設備的損失函數 for l in ls: l.backward() # 各個設備分別反向傳播 trainer.step(batch_size) # 優化時自動同步各個設備參數 nd.waitall() print('epoch %d, training time: %.1f sec'%(epoch, time() - start)) test_acc = gb.evaluate_accuracy(test_iter, net, ctx[0]) print('validation accuracy: %.4f'%(test_acc)) train(num_gpus=2, batch_size=512, lr=0.3)