多卡訓練模式:
進行深度學習模型訓練的時候,一般使用GPU來進行加速,當訓練樣本只有百萬級別的時候,單卡GPU通常就能滿足我們的需求,但是當訓練樣本量達到上千萬,上億級別之后,單卡訓練耗時很長,這個時候通常需要采用多機多卡加速。深度學習多卡訓練常見有兩種方式,一種是數據並行化(data parallelism),另外一種是模型並行化(model parallelism)。
深度模型訓練方法:
深度學習模型的訓練是一個迭代的過程,在每一輪迭代過程中,前向傳播算法會根據當前參數的取值,計算出在一小部分訓練數據上的預測值,然后反向傳播算法,再根據損失函數計算參數的梯度並且更新參數。
一、數據並行化
數據並行化:每個GPU上面跑一個模型,模型與模型之間結構參數相同,只是訓練的數據不一樣,每個模型通過最后的loss計算得到梯度之后,再把梯度傳到一個parameter server(PS)上進行參數平均average gradient,然后再根據average gradient更新模型的參數。
深度學習算法由於數據量非常大、算法復雜度高等特點,常常需要采用並行機制。
### 常用的並行化深度學習模型訓練方式有兩種:同步模式和異步模式。
同步模式
同步模式:等到所有的數據分片都完成了梯度計算並把梯度傳到PS之后統一的更新每個模型的參數。優點是訓練穩定,訓練出來的模型得到精度比較高;缺點是訓練的時間取決於分片中最慢的那個片,所以同步模式適用於GPU之間性能差異不大情況下。
同步模式訓練方法
在同步模式下,所有的設備同時讀取參數的取值,並且當反向傳播算法完成之后同步更新參數的取值,單個設備不會單獨對參數進行更新,而會等所有設備都完成反向傳播之后再統一更新參數。
同步模式訓練流程圖如下:
這里寫圖片描述
- 圖中在迭代每一輪時,不同設備首先統一讀取當前參數的取值,並隨機獲取一小部分數據
- 然后在不同設備上運行反向傳播過程得到在各自訓練數據上的參數的梯度
- 注意:雖然所有設備使用的參數是一致的,但是因為訓練數據不同,所以得到的參數的梯度可能不一樣
- 當所有設備完成反向傳播的計算之后,需要計算出不同設備上參數梯度的平均值
- 最后再根據平均值對參數進行更新
異步模式
異步模式訓練方法在並行化地訓練深度學習模型時,不同設備(GPU或CPU)可以再不同訓練數據上,運行整個迭代的過程,而不同並行模式的區別在於不同的參數更新方式。
異步模式訓練流程圖如下:
這里寫圖片描述
- 在每一輪迭代時,不同設備會讀取參數最新的取值
- 因為設備不同,讀取參數取值時間不一樣,所以得到的值也可能不一樣
- 根據當前參數的取值,和隨機獲取的一小部分訓練數據,不同設備各自運行反向傳播的過程,並且獨立地更新參數
- 可以認為異步模式,就是單機模式復制了多份,每一份使用不同的訓練數據進行訓練。
- 在異步模式下,不同設備之前是完全獨立的
同步/異步優劣比較
* 同步模式解決了異步模式中存在的參數更新問題,然而同步模式的效率卻低於異步模式
* 在同步模式下,每一輪迭代都需要設備統一開始、統一結束
* 如果設備的運行速度不一致,那么每一輪訓練都需要等待最慢的設備結束才能開始更新參數,於是很多時間將被花在等待上
* 雖然理論上異步模式存在缺陷,但是因為訓練深度學習模型時,使用的隨機梯度下降本身就是梯度下降的一個近似解法,而且即使是梯度下降也無法保證達到全局最優
* 所以在實際應用中,相同時間內,使用異步模式訓練的模型不一定比同步模式差
代碼示例
#將神經網絡的優化過程跑在不同的GPU上
for i in range(N_GPU): with tf.debice('/gpu:%d'%i) with tf.name_scope('GPU_%d'%i) as scope: cur_loss = get_loss(x,y_regularizer,scope) #tf.get_variable的命名空間 tf.get_variable_scope().reuse_variables() #使用當前gpu計算所有變量的梯度 grads= opt.compute_gradients(cur_loss) tower_grads.append(grads) #計算變量的平均梯度 grads = average_gradients(tower_grads) #使用平均梯度更新參數 apply_gradient_op = opt.apply_gradients(grads,global_step = global)
二、模型並行化
模型並行化:當一個模型非常復雜,非常大,達到單機的內存根本沒法容納的時候,模型並行化就是一個好的選擇。直觀說就多多個GPU訓練,每個GPU分別持有模型的一個片。它的優點很明顯,大模型訓練,缺點就是模型分片之間的通信和數據傳輸很耗時,所以不能簡單說,模型並行就一定比數據並行要快。
還有數據並行和模型並行的混合模型:
數據並行適用於數據量比較小時快速訓練模型,模型並行適用於大數據、大模型場景下。這里只是簡單介紹,想要深入了解細節可以找其他資料學習一下。下面主要基於tensorflow講一個數據並行化的實例。
1、 單機多卡訓練:給個例子,比如一台機器上裝有4個GPU卡,以cpu做為PS(master),主要保存一下參數和變量,進行梯度平均。其余4個GPU訓練模型(worker),進行一些計算量比加大的操作。
- 1) 本地對數據切分為一個一個batch;
- 2) 把數據分別放到送到4個GPU進行模型訓練,每個GPU拿到數據不一樣;
- 3) 四個GPU訓練,求loss得到梯度,並把梯度送回到CPU進行模型平均。
- 4) cpu接收4個gpu傳來的梯度,進行梯度平均。
- 5) 四個GPU跟新參數
- 6) 重復2~5知道模型收斂。
2、 分布式的多久多卡:當是在一個多台機器的集群上訓練的時候采用這種方式,在tensorflow中需要明確指定ps和worker的地址,這種方式兼容單機多卡,只是把ps和worker的地址設為local就可以了。
下面簡要介紹下tensorflow中支持多卡訓練和參數更新的幾個API,具體介紹可以參考這篇文章(Distributedtensorflow實現原理)http://blog.csdn.net/lqfarmer/article/details/70214026。
Tensorflow進行重復性訓練有In-graph replication和Between-graphreplication兩種方式,In-graph replication就是數據並行化模式,Between-graphreplication就是數據並行化模式。梯度更新有異步Asynchronous training和同步Synchronous training兩種模式。
Tensorflow官網也給了一個cifar10_multi_gpu_train.py 的例子,在單機多卡上運行,這里我給一個自己做的單機多卡訓練的簡單例子供參考,自己在搭建這個結構過程中也栽了很多坑,還在繼續探索中,僅有訓練部分。
程序主要分為五個部分:
- Main函數:定義主要運行邏輯;
- Run_epoch函數:定義主要訓練邏輯;
- Generate_feed_dic函數:產生訓練需要的batch樣本;
- Multi_gpu_model函數:定義多個tower,每個tower對應一個gpu;
- Average_gradients函數:梯度平均計算。
一下是完整代碼:

1 #critital class define 2 3 #getaverage gradient 4 5 defaverage_gradients(tower_grads): 6 7 average_grads = [] 8 9 for grad_and_vars in zip(*tower_grads): 10 11 grads = [] 12 13 for g, _ in grad_and_vars: 14 15 expanded_g = tf.expand_dims(g, 0) 16 17 grads.append(expanded_g) 18 19 grad = tf.concat(axis=0, values=grads) 20 21 grad = tf.reduce_mean(grad, 0) 22 23 v = grad_and_vars[0][1] 24 25 grad_and_var = (grad, v) 26 27 average_grads.append(grad_and_var) 28 29 return average_grads 30 31 32 33 #setupmultiple gpu tower 34 35 defmulti_gpu_model(num_gpus=4, word_embeddings = None): 36 37 grads = [] 38 39 global_step = tf.Variable(0,name="global_step", trainable=False) 40 41 optimizer = tf.train.AdamOptimizer(1e-3) 42 43 withtf.variable_scope(tf.get_variable_scope()) as initScope: 44 45 for i in range(num_gpus): 46 47 withtf.device("/gpu:%d"%i): 48 49 withtf.name_scope("tower_%d"%i): 50 51 siameseModel = SiameseLSTM( 52 53 sequence_length=FLAGS.max_document_length, 54 55 embedding_size=FLAGS.embedding_dim, 56 57 hidden_units=FLAGS.hidden_units, 58 59 l2_reg_lambda=FLAGS.l2_reg_lambda, 60 61 batch_size=FLAGS.batch_size, 62 63 word_embeddings=word_embeddings) 64 65 tf.get_variable_scope().reuse_variables() 66 67 tf.add_to_collection("train_model", siameseModel) 68 69 grad_and_var =optimizer.compute_gradients(siameseModel.loss) 70 71 grads.append(grad_and_var) 72 73 tf.add_to_collection("loss",siameseModel.loss) 74 75 tf.add_to_collection("accuracy",siameseModel.accuracy) 76 77 tf.add_to_collection("distance",siameseModel.distance) 78 79 with tf.device("cpu:0"): 80 81 averaged_gradients =average_gradients(grads) 82 83 train_op =optimizer.apply_gradients(averaged_gradients, global_step=global_step) 84 85 return train_op,global_step 86 87 #generating training data 88 89 defgenerate_feed_dic(sess, batch_generator,feed_dict,train_op): 90 91 92 93 SMS =tf.get_collection("train_model") 94 95 for siameseModel in SMS: 96 97 x1_batch, x2_batch, y_batch =batch_generator.next() 98 99 if random()>0.5: 100 101 feed_dict[siameseModel.input_x1] =x1_batch 102 103 feed_dict[siameseModel.input_x2] =x2_batch 104 105 feed_dict[siameseModel.input_y] =y_batch 106 107 feed_dict[siameseModel.dropout_keep_prob]= FLAGS.dropout_keep_prob 108 109 else: 110 111 feed_dict[siameseModel.input_x1] =x2_batch 112 113 feed_dict[siameseModel.input_x2] =x1_batch 114 115 feed_dict[siameseModel.input_y] =y_batch 116 117 feed_dict[siameseModel.dropout_keep_prob]= FLAGS.dropout_keep_prob 118 119 return feed_dict 120 121 #define main trainingprocess 122 123 def run_epoch(sess,train_x1_idsList,train_x2_idsList,train_y,scope,global_step,train_op=None,is_training=False): 124 125 if is_training: 126 127 epoches = len(train_x1_idsList) //FLAGS.batch_size 128 129 batch_generator =datatool.data_iterator(train_x1_idsList, train_x2_idsList,train_y,FLAGS.batch_size,FLAGS.max_document_length) 130 131 # siameseModels =tf.get_collection("train_model") 132 133 while epoches > 0: 134 135 feed_dict = {} 136 137 epoches -= 1 138 139 feed_dict=generate_feed_dic(sess,batch_generator,feed_dict,train_op) 140 141 i = FLAGS.num_iteration 142 143 while i > 0: 144 145 i = i - 1 146 147 losses =tf.get_collection("loss") 148 149 accuracy =tf.get_collection("accuracy") 150 151 distance =tf.get_collection("distance") 152 153 total_accuracy =tf.add_n(losses, name='total_accu') 154 155 total_distance = tf.add_n(losses,name='total_distance') 156 157 total_loss = tf.add_n(losses,name='total_loss') 158 159 160 avg_losses = total_loss / 4 161 162 avg_accu = total_accuracy / 4 163 164 avg_dist = total_distance / 4 165 166 time_str =datetime.datetime.now().isoformat() 167 168 _,step,avg_losses,avg_accu,avg_dist =sess.run([train_op,global_step,total_loss,avg_accu,avg_dist],feed_dict) 169 170 #輸出訓練精度 171 172 print("TRAIN {}: step {},avg_loss {:g}, avg_dist {:g}, avg_acc {:g}".format(time_str, step,avg_losses, avg_dist, avg_accu)) 173 174 #whole training process 175 176 defmain(argv=None): 177 178 print("\nParameters:") 179 180 for attr, value insorted(FLAGS.__flags.items()): 181 182 print("{}={}".format(attr.upper(),value)) 183 184 print("") 185 186 #加載詞向量 187 188 word2id, word_embeddings =datatool.load_word2vec("your dir for word2vec") 189 190 print("load train data") 191 192 (train_x1_idsList,train_x2_idsList,train_y),(valid_x1_idsList, valid_x2_lList,valid_y) =datatool.get_data_for_siamese(word2id, FLAGS.data_path) 193 194 195 196 print("starting graph def") 197 198 gpu_options =tf.GPUOptions(per_process_gpu_memory_fraction=0.8) 199 200 withtf.Graph().as_default():#,tf.device('/cpu:0') 201 202 session_conf = tf.ConfigProto( 203 204 allow_soft_placement=FLAGS.allow_soft_placement, 205 206 log_device_placement=FLAGS.log_device_placement, 207 208 gpu_options=gpu_options) 209 210 sess = tf.Session(config=session_conf) 211 212 213 214 print("started session") 215 216 print ("build multiplemodel") 217 218 with tf.name_scope("train")as train_scope: 219 220 print("define multiple gpumodel and init the training operation") 221 222 train_op,global_step =multi_gpu_model(FLAGS.num_gpus,word_embeddings) 223 224 print ("init allvariable") 225 226 sess.run(tf.global_variables_initializer()) 227 228 print ("run epochestage") 229 230 run_epoch(sess,train_x1_idsList,train_x2_idsList,train_y,train_scope,global_step,train_op,True) 231 232 233 234 # Checkpoint directory. Tensorflowassumes this directory already exists so we need to create it 235 236 timestamp = str(int(time.time())) 237 238 checkpoint_dir =os.path.abspath(os.path.join(out_dir, "checkpoints")) 239 240 checkpoint_prefix =os.path.join(checkpoint_dir, "model") 241 242 if not os.path.exists(checkpoint_dir): 243 244 os.makedirs(checkpoint_dir) 245 246 out_dir =os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp)) 247 248 print("Writing to {}\n".format(out_dir)) 249 250 saver =tf.train.Saver(tf.global_variables(), max_to_keep=100)
注意:我用的是已經訓練好的詞向量,這里只需要加載進來就可以了。