Tensorflow1.4 高級接口使用(estimator, data, keras, layers)


TensorFlow 高級接口使用簡介(estimator, keras, data, experiment)

TensorFlow 1.4正式添加了kerasdata作為其核心代碼(從contrib中畢業),加上之前的estimator API,現在已經可以利用Tensorflow像keras一樣方便的搭建網絡進行訓練。data可以方便從多種來源的數據輸入到搭建的網絡中(利用tf.features可以方便的對結構化的數據進行讀取和處理,比如存在csv中的數據,具體操作可以參考這篇文檔);keras在搭建網絡的時候,可以很大程度上避免由於Session和Graph的重用出現的一系列問題(不過現在tf.keras與原版的keras還是有不少區別的地方,因此使用的時候也是會遇上一些意外的問題);而estimator提供了類似與sklearn中模型訓練的語法,非常好上手,而且默認定義了多種常用的任務,在自定義model_fn的時候也能讓人對訓練過程更加清晰,而且estimator提供了export_savedmodel函數很方便將訓練好的ckpt模型文件轉成pb文件並結合 dockertensorflow serving 進行靈活穩定的模型部署和更新。

1. tf.data 進行數據流操作(TFRecords)

keras中有keras.preprocessing.image.ImageDataGenerator()類和.flow_from_directory()函數可以很容易將保存在 文件夾 下面的數據進行讀取;也可以用.flow()函數將數據直接從np.array中讀取后輸入網絡進行訓練(具體可以查看官方文檔)。在使用圖片並以文件夾名作為分類名的訓練任務時這個方案是十分簡單有效的,但是Tensorflow官方推薦的數據保存格式是 TFRecords,而keras官方不支持直接從tfrecords文件中讀取數據(tf.keras也不行,但是這個issue中提供了一些PR是可以的,keras作者不太推薦就是了),所以這里就可以用data類來處理從TFRecords中的數據(也可以用之前常用的tf.train.batch()tf.train.shuffle_batch()來處理訓練數據)。

Tensorflow官方提供了詳細的文檔來介紹data的機制和使用方法(看這里),而對於TFRecords類型數據,主要利用 tf.data.Iterator()來抽取數據,這里簡單說下從TFRecords中提取數據的方法:

以下代碼為官方代碼

def dataset_input_fn():
  filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
  dataset = tf.data.TFRecordDataset(filenames) # 制定提取數據的tfrecords文件

  # Use `tf.parse_single_example()` to extract data from a `tf.Example`
  # protocol buffer, and perform any additional per-record preprocessing.
  def parser(record):   # 對tfrecords中的數據進行解析的操作
    keys_to_features = {
        "image_data": tf.FixedLenFeature((), tf.string, default_value=""),
        "date_time": tf.FixedLenFeature((), tf.int64, default_value=""),
        "label": tf.FixedLenFeature((), tf.int64,
                                    default_value=tf.zeros([], dtype=tf.int64)),
    }
    parsed = tf.parse_single_example(record, keys_to_features)

    # Perform additional preprocessing on the parsed data.
    image = tf.image.decode_jpeg(parsed["image_data"])
    image = tf.reshape(image, [299, 299, 1])
    label = tf.cast(parsed["label"], tf.int32)

    return {"image_data": image, "date_time": parsed["date_time"]}, label

  # Use `Dataset.map()` to build a pair of a feature dictionary and a label
  # tensor for each example.
  dataset = dataset.map(parser)  # 一般就用map函數對輸入圖像進行預處理,而預處理函數可以包含在上面用於解析的parser函數
  dataset = dataset.shuffle(buffer_size=10000)  # 在訓練的時候一般需要將輸入數據進行順序打亂提高訓練的泛化性
  dataset = dataset.batch(32)  # 單次讀取的batch大小
  dataset = dataset.repeat(num_epochs)   # 數據集的重復使用次數,為空的話則無線循環
  iterator = dataset.make_one_shot_iterator()  

  # `features` is a dictionary in which each value is a batch of values for
  # that feature; `labels` is a batch of labels.
  features, labels = iterator.get_next()  
  return features, labels  
  # return {"input": features, labels}  # 對於estimator的輸入前者為dict類型,后者為tensor

注: dataset.make_one_shot_iterator()是最簡單的Iterator類,不需要明確的initialization,並且目前這是唯一能夠在estimator中容易使用的iterator。對於需要重新使用的Dataset類(比如結構相同的訓練和測試數據集),一般是需要用 reinitializable iterator ,不過在estimator中由於上述問題,現在一般的做法是對訓練集和驗證集單獨寫兩個pipeline用make_one_shot_iterator來處理數據流。

參考

2. Dataset + Keras

通過data處理TFRecords數據流,我們就可以使用keras來進行訓練了。

def architecture(input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)):
    """
    Model architecture
    Args:
        input_shape: input image shape (not include batch)

    Returns: an keras model instance
    """
    base_model = Xception(include_top=True,
                          weights=None,   # no pre-trained weights used
                          pooling="max",
                          input_shape=input_shape,  # modify first layer
                          classes=_NUM_CLASSES)
    base_model.summary()
    return base_model


def train(source_dir, model_save_path):
    """
    Train patch based model
    Args:
        source_dir: a directory where training tfrecords file stored. All TF records start with train will be used!
        model_save_path: weights save path
    """
    if tf.gfile.Exists(source_dir):
        train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord")
        val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord")
        if not len(train_data_paths):
            raise Exception("[Train Error]: unable to find train*.tfrecord file")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    (images, labels) = dataset_input_fn(train_data_paths)
    model_input = keras.Input(tensor=images, shape=(_PATCH_SIZE, _PATCH_SIZE, 3), dtype=tf.float32, name="input")  # keras model的輸入需要為keras.Input類型,但是直接使用tensorflow tensor類型也是可以的
    base_model = architecture()
    model_output = base_model(model_input)
    model = keras.models.Model(inputs=model_input, outputs=model_output)
    optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9)
    model.compile(optimizer=optimizer,
                  loss=focal_loss,
                  metrics=['accuracy'],
                  target_tensors=[labels])   # 1
    tensorboard = tf.keras.callbacks.TensorBoard(log_dir=model_save_path)
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_save_path+"/saved_model.h5")
    model.fit(steps_per_epoch=8000, epochs=_EPOCHS, callbacks=[tensorboard, model_checkpoint])


def evaluate(source_dir, weights_path):
    """
    Eval patch based model
    Args:
        source_dir: directory where val tf records file stored. All TF records start with val will be used!
        weights_path: model weights save path
    """
    # load model
    base_model = architecture()
    base_model.load_weights(weights_path)
    # load test dataset
    if tf.gfile.Exists(source_dir):
        val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    (images, labels) = input_fn(source_dir)
    probs = base_model(images)
    predictions = tf.argmax(probs, axis=-1)
    accuracy_score = tf.reduce_mean(tf.equal(probs, predictions))
    print("Accuracy of testing images: {}".format(accuracy_score))

注: 對於#1來說,根據keras model的compile函數的文檔:

target_tensors: By default, Keras will create placeholders for the model's target, which will be fed with the target data during training. If instead you would like to use your own target tensors (in turn, Keras will not expect external Numpy data for these targets at training time), you can specify them via the target_tensors argument. It can be a single tensor (for a single-output model), a list of tensors, or a dict mapping output names to target tensors.

定義了target tensor 之后就不需要再從外部輸入數據了(fit的時候)。

而使用這種方式訓練得到的的模型文件為h5,如果想要轉換成用於tensorflow service的模型,需要用以下方式進行:a). fchollet提供的keras式寫法

# @1
from keras import backend as K

K.set_learning_phase(0)  # all new operations will be in test mode from now on

# serialize the model and get its weights, for quick re-building
config = previous_model.get_config()
weights = previous_model.get_weights()

# re-build a model where the learning phase is now hard-coded to 0
from keras.models import model_from_config
new_model = model_from_config(config)
new_model.set_weights(weights)
# @2
from tensorflow_serving.session_bundle import exporter

export_path = ... # where to save the exported graph
export_version = ... # version number (integer)

saver = tf.train.Saver(sharded=True)
model_exporter = exporter.Exporter(saver)
signature = exporter.classification_signature(input_tensor=model.input,
                                              scores_tensor=model.output)  # 分類
model_exporter.init(sess.graph.as_graph_def(),
                    default_graph_signature=signature)
model_exporter.export(export_path, tf.constant(export_version), sess)

或者tensorflow官方的寫法:

import os
import tensorflow as tf
import keras.backend as K

def save_model_for_production(model, version, path='prod_models'):
    K.set_learning_phase(0)
    if not os.path.exists(path):
        os.mkdir(path)
    export_path = os.path.join(
        tf.compat.as_bytes(path),
        tf.compat.as_bytes(str(get_new_version(path=path, current_version=int(version)))))
    builder = tf.saved_model.builder.SavedModelBuilder(export_path)

    model_input = tf.saved_model.utils.build_tensor_info(model.input)
    model_output = tf.saved_model.utils.build_tensor_info(model.output)

    prediction_signature = (
        tf.saved_model.signature_def_utils.build_signature_def(
            inputs={'inputs': model_input},
            outputs={'output': model_output},
            method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))

    with K.get_session() as sess:
        builder.add_meta_graph_and_variables(
            sess=sess, tags=[tf.saved_model.tag_constants.SERVING],
            signature_def_map={
                'predict':
                    prediction_signature,
            })

        builder.save()

參考

3. Dataset + estimator

可以用tf.layers函數來代替keras搭建網絡,而且可以提供更豐富的layer。

def xception():
    def tf_xception(features, classes=2, is_training=True):
    """
    The Xception architecture written in tf.layers
    Args:
        features: input image tensor
        classes: number of classes to classify images into
        is_training: is training stage or not

    Returns:
        2-D logits prediction output after pooling and activation
    """
    x = tf.layers.conv2d(features, 32, (3, 3), strides=(2, 2), use_bias=False, name='block1_conv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv1_bn')
    x = tf.nn.relu(x, name='block1_conv1_act')
    x = tf.layers.conv2d(x, 64, (3, 3), use_bias=False, name='block1_conv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv2_bn')
    x = tf.nn.relu(x, name='block1_conv2_act')

    residual = tf.layers.conv2d(x, 128, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv1_bn')
    x = tf.nn.relu(x, name='block2_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block2_pool')
    x = tf.add(x, residual, name='block2_add')

    residual = tf.layers.conv2d(x, 256, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.nn.relu(x, name='block3_sepconv1_act')
    x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv1_bn')
    x = tf.nn.relu(x, name='block3_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block3_pool')
    x = tf.add(x, residual, name="block3_add")

    residual = tf.layers.conv2d(x, 728, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.nn.relu(x, name='block4_sepconv1_act')
    x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv1_bn')
    x = tf.nn.relu(x, name='block4_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block4_pool')
    x = tf.add(x, residual, name="block4_add")

    for i in range(8):
        residual = x
        prefix = 'block' + str(i + 5)

        x = tf.nn.relu(x, name=prefix + '_sepconv1_act')
        x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv1')
        x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv1_bn')
        x = tf.nn.relu(x, name=prefix + '_sepconv2_act')
        x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv2')
        x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv2_bn')
        x = tf.nn.relu(x, name=prefix + '_sepconv3_act')
        x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv3')
        x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv3_bn')

        x = tf.add(x, residual, name=prefix+"_add")

    residual = tf.layers.conv2d(x, 1024, (1, 1), strides=(2, 2), padding='same', use_bias=False)
    residual = tf.layers.batch_normalization(residual, training=is_training)

    x = tf.nn.relu(x, name='block13_sepconv1_act')
    x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block13_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv1_bn')
    x = tf.nn.relu(x, name='block13_sepconv2_act')
    x = tf.layers.separable_conv2d(x, 1024, (3, 3), padding='same', use_bias=False, name='block13_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv2_bn')

    x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block13_pool')
    x = tf.add(x, residual, name="block13_add")

    x = tf.layers.separable_conv2d(x, 1536, (3, 3), padding='same', use_bias=False, name='block14_sepconv1')
    x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv1_bn')
    x = tf.nn.relu(x, name='block14_sepconv1_act')

    x = tf.layers.separable_conv2d(x, 2048, (3, 3), padding='same', use_bias=False, name='block14_sepconv2')
    x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv2_bn')
    x = tf.nn.relu(x, name='block14_sepconv2_act')
    # replace conv layer with fc
    x = tf.layers.average_pooling2d(x, (3, 3), (2, 2), name="global_average_pooling")
    x = tf.layers.conv2d(x, 2048, [1, 1], activation=None, name="block15_conv1")
    x = tf.layers.conv2d(x, classes, [1, 1], activation=None, name="block15_conv2")
    x = tf.squeeze(x, axis=[1, 2], name="logits")
    return x


def model_fn(features, labels, mode, params):
    """
    Model_fn for estimator model
    Args:
        features (Tensor): Input features to the model.
        labels (Tensor): Labels tensor for training and evaluation.
        mode (ModeKeys): Specifies if training, evaluation or prediction.
        params (HParams): hyper-parameters for estimator model
    Returns:
        (EstimatorSpec): Model to be run by Estimator.
    """
    # check if training stage
    if mode == tf.estimator.ModeKeys.TRAIN:
        is_training = True
    else:
        is_training = False
    # is_training = False   # 1
    input_tensor = features["input"]
    logits = xception(input_tensor, classes=_NUM_CLASSES, is_training=is_training)
    probs = tf.nn.softmax(logits, name="output_score")
    predictions = tf.argmax(probs, axis=-1, name="output_label")
    onehot_labels = tf.one_hot(tf.cast(labels, tf.int32), _NUM_CLASSES)
    # provide a tf.estimator spec for PREDICT
    predictions_dict = {"score": probs,
                        "label": predictions}
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions_output = tf.estimator.export.PredictOutput(predictions_dict)
        return tf.estimator.EstimatorSpec(mode=mode,
                                          predictions=predictions_dict,
                                          export_outputs={
                                              tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: predictions_output
                                          })
    # calculate loss
    # loss = focal_loss(onehot_labels, logits, gamma=1.5)
    gamma = 1.5
    weights = tf.reduce_sum(tf.multiply(onehot_labels, tf.pow(1. - probs, gamma)), axis=-1)
    loss = tf.losses.softmax_cross_entropy(onehot_labels, logits, weights=weights)
    accuracy = tf.metrics.accuracy(labels=labels,
                                   predictions=predictions)
    if mode == tf.estimator.ModeKeys.TRAIN:
        lr = params.learning_rate
        # train optimizer
        optimizer = tf.train.RMSPropOptimizer(learning_rate=lr, decay=0.9)
        update_ops = tf.get_collections(tf.GraphKeys.UPDATE_OPS)
        with tf.control_dependencies(update_ops):
          train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        tensors_to_log = {'batch_accuracy': accuracy[1],
                          'logits': logits,
                          'label': labels}
        logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=1000)  
        return tf.estimator.EstimatorSpec(mode=mode,
                                          loss=loss,
                                          train_op=train_op,
                                          training_hooks=[logging_hook])
    else:
        eval_metric_ops = {"accuracy": accuracy}
        return tf.estimator.EstimatorSpec(mode=mode,
                                          loss=loss,
                                          eval_metric_ops=eval_metric_ops)


def get_estimator_model(config=None, params=None):
    """
    Get estimator model by definition of model_fn
    """
    est_model = tf.estimator.Estimator(model_fn=model_fn,
                                       config=config,
                                       params=params)
    return est_model


def train(source_dir, model_save_path):
    """
    Train patch based model
    Args:
        source_dir: a directory where training tfrecords file stored. All TF records start with train will be used!
        model_save_path: weights save path
    """
    if tf.gfile.Exists(source_dir):
        train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord")
        val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord")
        if not len(train_data_paths):
            raise Exception("[Train Error]: unable to find train*.tfrecord file")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    train_config = tf.estimator.RunConfig()
    new_config = train_config.replace(model_dir=model_save_path,
                                      save_checkpoints_steps=1000,
                                      keep_checkpoint_max=5)
    params = tf.contrib.training.HParams(
        learning_rate=0.001,
        train_steps=5000,
        min_eval_frequency=1000
    )
    est_model = get_estimator_model(config=new_config,
                                    params=params)
    # define training config
    train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(data_path=train_data_paths,
                                                                  batch_size=_BATCH_SIZE,
                                                                  is_training=True),
                                        max_steps=_MAX_STEPS)

    eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(data_path=val_data_paths,
                                                                batch_size=_BATCH_SIZE),
                                      steps=100,
                                      throttle_secs=900)
    # train and evaluate model
    tf.estimator.train_and_evaluate(estimator=est_model,
                                    train_spec=train_spec,
                                    eval_spec=eval_spec)


def evaluate(source_dir, model_save_path):
    """
    Eval patch based model
    Args:
        source_dir: directory where val tf records file stored. All TF records start with val will be used!
        model_save_path: model save path
    """
    # load model
    run_config = tf.contrib.learn.RunConfig(model_dir=model_save_path)
    est_model = get_estimator_model(run_config)
    # load test dataset
    if tf.gfile.Exists(source_dir):
        val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    accuracy_score = est_model.evaluate(input_fn=lambda: input_fn(val_data_paths,
                                                                  batch_size=_BATCH_SIZE,
                                                                  is_training=False))
    print("Accuracy of testing images: {}".format(accuracy_score))

注:

  • BN layer in estimator: : 對於問題#1,在包含BN層的網絡,正常情況下都是需要明確指定是否處於training狀態(BN在訓練和其他狀態的計算方式是不同的),但是在 1.4.1 版本下,使用estimator進行訓練的時候是正常(is_training=True),但是在evaluation的時候(is_training=False)輸出的精度是初始化狀態,並且隨着train_and_evaluate()的多次調用,eval loss會變得很大。這應該是一個bug(具體問題可查看issue13895), 16455。而參考tensorflow官方給出的給予estimator的resnet example中的BN layer在train stage的時候並沒有設置is_training=True,所以這里把is_training關閉了,這樣操作訓練是正常的。 這個錯誤是由於Tensorflow tf.layers.batch_normalization()函數調用方式不對導致的,根據這個函數的[官方文檔](https://www.tensorflow.org/api_docs/python/tf/layers/batch_normalization):

Note: when training, the moving_mean and moving_variance need to be updated. By default the update ops are placed in tf.GraphKeys.UPDATE_OPS, so they need to be added as a dependency to the train_op.

需要明確手動添加update_ops作為dependency到update_ops:

update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
  train_op = optimizer.minimize(loss)

此外,輸入的圖片的channel需要為3

  • TensorHook for logging: 在使用 tensorhook 打印訓練日志的時候需要明確指tf.logging的verbosity level: tf.logging.set_verbosity(tf.logging.INFO)
  • estimator自定義model_fn的時候需要根據model_fn的使用狀態返回不同的EstimatorSpec,model_fn的定義需要符合以下格式,具體可以查看官方文檔
def model_fn(features, labels, mode, params):
   # Logic to do the following:
   # 1. Configure the model via TensorFlow operations
   # 2. Define the loss function for training/evaluation
   # 3. Define the training operation/optimizer
   # 4. Generate predictions
   # 5. Return predictions/loss/train_op/eval_metric_ops in EstimatorSpec object
   return EstimatorSpec(mode, predictions, loss, train_op, eval_metric_ops)

model_fn有三種mode:tf.estimator.ModeKeys.TRAINtf.estimator.ModeKeys.EVALtf.estimator.ModeKeys.PREDICT,其中第一個必須要返回的為losstraining_op, 第二個是loss,第三個是prediction

  • tf.metrics.accuracy: 一般情況下EVAL的時候model_fn會返回eval_metric_ops,這個是一個dict,其值為tf.metrics的返回值(以tf.metrics.accuracy()為例,返回值如下):

accuracy: A Tensor representing the accuracy, the value of total divided by count.

update_op: An operation that increments the total and count variables appropriately and whose value matches accuracy.

但是這個函數在estimator模型中使用的時候有點反直覺:

tf.metrics.accuracy is not meant to compute the accuracy of a single batch. It returns both the accuracy and an update_op, and update_op is intended to be run every batch, which updates the accuracy.

所以在上面代碼中對每個batch算accuracy的時候算的是update_op而非accuracy,更多關於tf.metrics.accuracy()的討論可以看這倆個issue:15115, 9498

Export saved model

Freeze model/export savedmodel for Tensorflow Serving
使用estimator進行訓練的一大好處就是進行分布式部署的擴展很容易,而部署需要的模型結構為(具體看這里):

image3
從訓練的ckpt模型文件freeze從pb(protobuf)模型文件,estimator提供了export_savedmodel來幫助快速進行操作。

def serving_input_receiver_fn():
    """
    Build serving inputs
    """
    inputs = tf.placeholder(dtype=tf.string, name="input_image")
    feature_config = {'image/encoded': tf.FixedLenFeature(shape=[], dtype=tf.string)}
    tf_example = tf.parse_example(inputs, feature_config)
    patch_images = tf.map_fn(_preprocess_image, tf_example["image/encoded"], dtype=tf.float32)
    patch_images = tf.squeeze(patch_images, axis=[0])
    receive_tensors = {'example': inputs}
    features = {"input": patch_images}
    return tf.estimator.export.ServingInputReceiver(features, receive_tensors)


def save_serving_model():
    session_config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
    config = tf.estimator.RunConfig(model_dir=MODEL_WEIGHTS_PATH, session_config=session_config)  # session_config is used for configuration of session
    model = get_estimator_model(config=config)
    model.export_savedmodel(export_dir_base=SERVING_MODEL_SAVE_PATH, serving_input_receiver_fn=serving_input_receiver_fn)

如果需要在預測的時候調整sess的參數(ConfigProto),可以通過tf.estimator.RunConfig配置session_config的參數然后再輸入到tf.estimator.Estimator()的config參數(如上面所示)。

predict

一般通過export_savedmodel方式輸出的模型是用tensorflow serving封裝服務進行預測:

# start tensorflow serving
$ bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server --port=9000 --model_name=your_model_name --model_base_path=/your/model/base/path
server = "localhost:9000"

def serving_predict(face_img):
    """
    Get prediction based on given face image
    Args:
        face_img: input image file
    Returns:
        anti-spoof detection result of given face
    """
    host, port = server.split(":")
    channel = implementations.insecure_channel(host, int(port))
    stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
    # creating request
    request = predict_pb2.PredictRequest()
    request.model_spec.name = "face_anti_spoof"
    request.model_spec.signature_name = "predict_images"
    # parse face img for serving
    with open(face_img, "rb") as f:
        serialized = f.read()
    example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(serialized)}))
    serialized = example.SerializeToString()
    request.inputs['inputs'].CopyFrom(tf.contrib.util.make_tensor_proto(serialized, shape=[1]))
    # predict results
    content = stub.Predict(request, timeout=5)
    labels = content.outputs["label"].int64_val
    prob = np.sum(labels) / len(labels)
    if prob > 0.5:
        label = LABELS[1]
    else:
        label = LABELS[0]
        prob = 1 - prob
    info = {"label": label, "prob": prob}
    return info

具體關於tensorlfow serving的用法會在之后的文章里面展開。

而生成的pb文件也可以直接加載不通過tensorflow serving來進行預測,不過由於estimator的export_savedmodel輸出的pb文件中模型的保存格式為tf.saved_model類,所以加載模型用tf.saved_model.loader模塊進行操作會比較方便(sess.restore()加載這個pb模型很容易報錯= =):

def load_graph(trained_model):
    session_config = tf.ConfigProto()
    session_config.gpu_options.allow_growth = True
    sess = tf.Session(config=session_config)
    meta_graph_def = tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.SERVING], trained_model)
    # signature = meta_graph_def.signature_def
    # signature_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
    # print(signature[signature_key])
    return sess
sess = load_graph(model_path)

def predict_pb(image):
    """
    Predict input face is real or not using model stored in save_model.pb generated by estimator export_savedmodel
    Args:
        image: input image file
    """
    assert tf.gfile.Exists(image), "Input image file not found!"
    with open(image, "rb") as f:
        image_bytes = f.read()
    example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(image_bytes)}))
    serialized = example.SerializeToString()
    input_tensor = sess.graph.get_tensor_by_name("input_image:0")
    labels = sess.graph.get_tensor_by_name("output_label:0")
    scores = sess.graph.get_tensor_by_name("output_score:0")
    (labels, scores) = sess.run([labels, scores], feed_dict={input_tensor: [serialized]})
    return labels, scores

參考

  1. estimator tutorial
  1. estimator example
  1. discussion
  1. tf.layers
  1. BN layer
  1. export & load saved model and predict
  1. estimator gpu options setting

4. keras + estimator + dataset

大部分與上面相同,只是用keras.estimator.model_to_estimator()函數直接將keras model直接轉換成estimator model而不用自己定義model_fn,這種方式對於模型訓練十分簡單友好,但是在使用Tensorflow estimator的export_savedmodel()函數的時候會報錯,原因是缺少export_outputs(在自定義model_fn的時候好像也並沒有需要明確定義export_outputs變量,非常微妙的報錯),而用官方的做法轉化模型的時候又會報 BN層參數沒有初始化 的錯誤(Attempting to use uninitialized value bn_xx)(又是BN層, 跟之前一樣可能是由於graph重用引起的錯誤),因此用這種方式訓練的模型暫時還沒有找到轉化成Tensorflow serving所需的模型的方法。

def model_fn(model_dir=".", input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)):
    """
    Convert keras model to estimator model
    Args:
        model_dir: absolute model save directory
        input_shape: input image shape (not include batch)

    Returns: an estimator model instance
    """
    with K.name_scope("model"):
        base_model = tf.keras.applications.Xception(include_top=True,
                              weights=None,   # no pre-trained weights used
                              pooling="max",
                              input_shape=input_shape,  # modify first layer
                              classes=_NUM_CLASSES)
        base_model.summary()
        model_input = keras.Input(shape=input_shape, dtype=tf.float32, name="input_1")
        model_output = base_model(model_input)
        model = keras.models.Model(inputs=model_input, outputs=model_output)
        optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9)
        model.compile(optimizer=optimizer,
                      loss=focal_loss,
                      metrics=['accuracy'])
        if model_dir == ".":
            model_dir = os.path.dirname(os.path.abspath(__file__))
        elif not model_dir.startswith("/"):  # relative path to abs path
            model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), model_dir)
        if not os.path.exists(model_dir):
            os.makedirs(model_dir)
        # define training config
        run_config = tf.estimator.RunConfig(save_checkpoints_steps=2000,
                                            keep_checkpoint_max=5)
        # convert keras model to estimator model
        est_model = keras.estimator.model_to_estimator(model, model_dir=model_dir, config=run_config)
        return est_model

參考

5. estimator + experiment + datasets

在1.4中estimator model有train_and_evaluate()函數進行開始模型的train和evaluate,也可以直接用model.train()model.evaluate()函數(類似keras model的fit和evaluate函數);不過可以用tf.contrib.learn.Experiment()來代替(需要定義experiment類,比較麻煩)。大部分與estimator model相同,需要定義experiment_fn如下:

from tensorflow.contrib.learn import learn_runner

def experiment_fn(run_config, params):
    """
    Create an experiment to train and evaluate the model
    Args:
        run_config: configuration for estimator
        params: Hparams object returned by tf.contrib.training.HParams
    Returns:
        Experiment object
    """
    if tf.gfile.Exists(FLAGS.train_data_dir) and tf.gfile.Exists(FLAGS.val_data_dir):
        train_data_paths = tf.gfile.Glob(FLAGS.train_data_dir+"/train*tfrecord")
        val_data_paths = tf.gfile.Glob(FLAGS.val_data_dir+"/val*tfrecord")
        if not len(train_data_paths):
            raise Exception("[Train Error]: unable to find train*.tfrecord file")
        if not len(val_data_paths):
            raise Exception("[Eval Error]: unable to find val*.tfrecord file")
    else:
        raise Exception("[Train Error]: unable to find input directory!")
    estimator = get_estimator_model(config=run_config, params=params)

    def train_input_fn():
        return input_fn(data_path=train_data_paths, batch_size=_BATCH_SIZE, is_training=True)

    def eval_input_fn():
        return input_fn(data_path=val_data_paths, batch_size=_BATCH_SIZE, is_training=False)

    experiment = tf.contrib.learn.Experiment(
        estimator=estimator,
        train_input_fn=train_input_fn,
        eval_input_fn=eval_input_fn,
        train_steps=params.train_steps,
        min_eval_frequency=params.min_eval_frequency,
    )
    return experiment


def run_experiment():
    """
    Run the training experiment
    """
    params = tf.contrib.training.HParams(
        learning_rate=0.002,
        n_classes=_NUM_CLASSES,
        train_steps=5000,
        min_eval_frequency=100
    )
    run_config = tf.contrib.learn.RunConfig(model_dir=FLAGS.save_dir,
                                            save_checkpoints_steps=1000,
                                            keep_checkpoint_max=5)
    experiment = experiment_fn(run_config=run_config, params=params)
    experiment.train_and_evaluate()
    # learn_runner.run(
    #     experiment_fn=experiment_fn,  # first class function
    #     run_config=run_config,
    #     schedule='train_and_evaluate',
    #     hparams=params
    # )

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM