keras使用horovod多gpu訓練
Horovod以類似的方式支持Keras和常規TensorFlow。要使用Horovod,請在程序中添加以下內容。
-
運行
hvd.init()
。
-
使用固定服務器GPU,以供此過程使用
config.gpu_options.visible_device_list
。通過每個進程一個GPU的典型設置,您可以將其設置為local rank。在這種情況下,服務器上的第一個進程將被分配第一GPU,第二個進程將被分配第二GPU,依此類推。
-
通過工人人數來衡量學習率。
同步分布式培訓中的有效批處理規模是根據工人人數來衡量的。學習率的提高彌補了批量大小的增加。
-
將優化器包裝在中
hvd.DistributedOptimizer
。分布式優化器將梯度計算委派給原始優化器,使用allreduce或allgather對梯度求平均,然后應用這些平均梯度。
-
添加
hvd.callbacks.BroadcastGlobalVariablesCallback(0)
到播放初始變量狀態從0級到所有其他進程。當使用隨機權重開始訓練或從檢查點恢復訓練時,這是確保所有工人進行一致初始化的必要步驟。
-
修改您的代碼以僅在工作程序0上保存檢查點,以防止其他工作程序破壞它們。
通過使用來保護模型檢查點代碼來實現此目的。
hvd.rank() != 0
示例代碼
1 from __future__ import print_function 2 import keras 3 from keras.datasets import mnist 4 from keras.models import Sequential 5 from keras.layers import Dense, Dropout, Flatten 6 from keras.layers import Conv2D, MaxPooling2D 7 from keras import backend as K 8 import math 9 import tensorflow as tf 10 import horovod.keras as hvd 11 12 # Horovod: initialize Horovod. 13 hvd.init() 14 15 # Horovod: pin GPU to be used to process local rank (one GPU per process) 16 config = tf.ConfigProto() 17 config.gpu_options.allow_growth = True 18 config.gpu_options.visible_device_list = str(hvd.local_rank()) 19 K.set_session(tf.Session(config=config)) 20 21 batch_size = 128 22 num_classes = 10 23 24 # Horovod: adjust number of epochs based on number of GPUs. 25 epochs = int(math.ceil(12.0 / hvd.size())) 26 27 # Input image dimensions 28 img_rows, img_cols = 28, 28 29 30 # The data, shuffled and split between train and test sets 31 (x_train, y_train), (x_test, y_test) = mnist.load_data() 32 33 if K.image_data_format() == 'channels_first': 34 x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) 35 x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) 36 input_shape = (1, img_rows, img_cols) 37 else: 38 x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) 39 x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) 40 input_shape = (img_rows, img_cols, 1) 41 42 x_train = x_train.astype('float32') 43 x_test = x_test.astype('float32') 44 x_train /= 255 45 x_test /= 255 46 print('x_train shape:', x_train.shape) 47 print(x_train.shape[0], 'train samples') 48 print(x_test.shape[0], 'test samples') 49 50 # Convert class vectors to binary class matrices 51 y_train = keras.utils.to_categorical(y_train, num_classes) 52 y_test = keras.utils.to_categorical(y_test, num_classes) 53 54 model = Sequential() 55 model.add(Conv2D(32, kernel_size=(3, 3), 56 activation='relu', 57 input_shape=input_shape)) 58 model.add(Conv2D(64, (3, 3), activation='relu')) 59 model.add(MaxPooling2D(pool_size=(2, 2))) 60 model.add(Dropout(0.25)) 61 model.add(Flatten()) 62 model.add(Dense(128, activation='relu')) 63 model.add(Dropout(0.5)) 64 model.add(Dense(num_classes, activation='softmax')) 65 66 # Horovod: adjust learning rate based on number of GPUs. 67 opt = keras.optimizers.Adadelta(1.0 * hvd.size()) 68 69 # Horovod: add Horovod Distributed Optimizer. 70 opt = hvd.DistributedOptimizer(opt) 71 72 model.compile(loss=keras.losses.categorical_crossentropy, 73 optimizer=opt, 74 metrics=['accuracy']) 75 76 callbacks = [ 77 # Horovod: broadcast initial variable states from rank 0 to all other processes. 78 # This is necessary to ensure consistent initialization of all workers when 79 # training is started with random weights or restored from a checkpoint. 80 hvd.callbacks.BroadcastGlobalVariablesCallback(0), 81 ] 82 83 # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. 84 if hvd.rank() == 0: 85 callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5')) 86 87 model.fit(x_train, y_train, 88 batch_size=batch_size, 89 callbacks=callbacks, 90 epochs=epochs, 91 verbose=1, 92 validation_data=(x_test, y_test)) 93 score = model.evaluate(x_test, y_test, verbose=0) 94 print('Test loss:', score[0]) 95 print('Test accuracy:', score[1])