TensorFlow 高級接口使用簡介(estimator, keras, data, experiment)
TensorFlow 1.4正式添加了keras
和data
作為其核心代碼(從contrib中畢業),加上之前的estimator
API,現在已經可以利用Tensorflow像keras一樣方便的搭建網絡進行訓練。data
可以方便從多種來源的數據輸入到搭建的網絡中(利用tf.features
可以方便的對結構化的數據進行讀取和處理,比如存在csv
中的數據,具體操作可以參考這篇文檔);keras
在搭建網絡的時候,可以很大程度上避免由於Session和Graph的重用出現的一系列問題(不過現在tf.keras
與原版的keras還是有不少區別的地方,因此使用的時候也是會遇上一些意外的問題);而estimator
提供了類似與sklearn
中模型訓練的語法,非常好上手,而且默認定義了多種常用的任務,在自定義model_fn
的時候也能讓人對訓練過程更加清晰,而且estimator
提供了export_savedmodel
函數很方便將訓練好的ckpt模型文件轉成pb文件並結合 docker 和 tensorflow serving 進行靈活穩定的模型部署和更新。
1. tf.data
進行數據流操作(TFRecords)
在keras
中有keras.preprocessing.image.ImageDataGenerator()
類和.flow_from_directory()
函數可以很容易將保存在 文件夾 下面的數據進行讀取;也可以用.flow()
函數將數據直接從np.array中讀取后輸入網絡進行訓練(具體可以查看官方文檔)。在使用圖片並以文件夾名作為分類名的訓練任務時這個方案是十分簡單有效的,但是Tensorflow官方推薦的數據保存格式是 TFRecords,而keras官方不支持直接從tfrecords文件中讀取數據(tf.keras
也不行,但是這個issue中提供了一些PR是可以的,keras作者不太推薦就是了),所以這里就可以用data
類來處理從TFRecords中的數據(也可以用之前常用的tf.train.batch()
或tf.train.shuffle_batch()
來處理訓練數據)。
Tensorflow官方提供了詳細的文檔來介紹data
的機制和使用方法(看這里),而對於TFRecords類型數據,主要利用 tf.data.Iterator()
來抽取數據,這里簡單說下從TFRecords中提取數據的方法:
以下代碼為官方代碼
def dataset_input_fn():
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames) # 制定提取數據的tfrecords文件
# Use `tf.parse_single_example()` to extract data from a `tf.Example`
# protocol buffer, and perform any additional per-record preprocessing.
def parser(record): # 對tfrecords中的數據進行解析的操作
keys_to_features = {
"image_data": tf.FixedLenFeature((), tf.string, default_value=""),
"date_time": tf.FixedLenFeature((), tf.int64, default_value=""),
"label": tf.FixedLenFeature((), tf.int64,
default_value=tf.zeros([], dtype=tf.int64)),
}
parsed = tf.parse_single_example(record, keys_to_features)
# Perform additional preprocessing on the parsed data.
image = tf.image.decode_jpeg(parsed["image_data"])
image = tf.reshape(image, [299, 299, 1])
label = tf.cast(parsed["label"], tf.int32)
return {"image_data": image, "date_time": parsed["date_time"]}, label
# Use `Dataset.map()` to build a pair of a feature dictionary and a label
# tensor for each example.
dataset = dataset.map(parser) # 一般就用map函數對輸入圖像進行預處理,而預處理函數可以包含在上面用於解析的parser函數
dataset = dataset.shuffle(buffer_size=10000) # 在訓練的時候一般需要將輸入數據進行順序打亂提高訓練的泛化性
dataset = dataset.batch(32) # 單次讀取的batch大小
dataset = dataset.repeat(num_epochs) # 數據集的重復使用次數,為空的話則無線循環
iterator = dataset.make_one_shot_iterator()
# `features` is a dictionary in which each value is a batch of values for
# that feature; `labels` is a batch of labels.
features, labels = iterator.get_next()
return features, labels
# return {"input": features, labels} # 對於estimator的輸入前者為dict類型,后者為tensor
注: dataset.make_one_shot_iterator()
是最簡單的Iterator類,不需要明確的initialization,並且目前這是唯一能夠在estimator中容易使用的iterator。對於需要重新使用的Dataset類(比如結構相同的訓練和測試數據集),一般是需要用 reinitializable iterator ,不過在estimator中由於上述問題,現在一般的做法是對訓練集和驗證集單獨寫兩個pipeline用make_one_shot_iterator
來處理數據流。
參考
2. Dataset + Keras
通過data處理TFRecords數據流,我們就可以使用keras來進行訓練了。
def architecture(input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)):
"""
Model architecture
Args:
input_shape: input image shape (not include batch)
Returns: an keras model instance
"""
base_model = Xception(include_top=True,
weights=None, # no pre-trained weights used
pooling="max",
input_shape=input_shape, # modify first layer
classes=_NUM_CLASSES)
base_model.summary()
return base_model
def train(source_dir, model_save_path):
"""
Train patch based model
Args:
source_dir: a directory where training tfrecords file stored. All TF records start with train will be used!
model_save_path: weights save path
"""
if tf.gfile.Exists(source_dir):
train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord")
val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord")
if not len(train_data_paths):
raise Exception("[Train Error]: unable to find train*.tfrecord file")
if not len(val_data_paths):
raise Exception("[Eval Error]: unable to find val*.tfrecord file")
else:
raise Exception("[Train Error]: unable to find input directory!")
(images, labels) = dataset_input_fn(train_data_paths)
model_input = keras.Input(tensor=images, shape=(_PATCH_SIZE, _PATCH_SIZE, 3), dtype=tf.float32, name="input") # keras model的輸入需要為keras.Input類型,但是直接使用tensorflow tensor類型也是可以的
base_model = architecture()
model_output = base_model(model_input)
model = keras.models.Model(inputs=model_input, outputs=model_output)
optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9)
model.compile(optimizer=optimizer,
loss=focal_loss,
metrics=['accuracy'],
target_tensors=[labels]) # 1
tensorboard = tf.keras.callbacks.TensorBoard(log_dir=model_save_path)
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=model_save_path+"/saved_model.h5")
model.fit(steps_per_epoch=8000, epochs=_EPOCHS, callbacks=[tensorboard, model_checkpoint])
def evaluate(source_dir, weights_path):
"""
Eval patch based model
Args:
source_dir: directory where val tf records file stored. All TF records start with val will be used!
weights_path: model weights save path
"""
# load model
base_model = architecture()
base_model.load_weights(weights_path)
# load test dataset
if tf.gfile.Exists(source_dir):
val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord")
if not len(val_data_paths):
raise Exception("[Eval Error]: unable to find val*.tfrecord file")
else:
raise Exception("[Train Error]: unable to find input directory!")
(images, labels) = input_fn(source_dir)
probs = base_model(images)
predictions = tf.argmax(probs, axis=-1)
accuracy_score = tf.reduce_mean(tf.equal(probs, predictions))
print("Accuracy of testing images: {}".format(accuracy_score))
注: 對於#1來說,根據keras model的compile函數的文檔:
target_tensors: By default, Keras will create placeholders for the model's target, which will be fed with the target data during training. If instead you would like to use your own target tensors (in turn, Keras will not expect external Numpy data for these targets at training time), you can specify them via the target_tensors argument. It can be a single tensor (for a single-output model), a list of tensors, or a dict mapping output names to target tensors.
定義了target tensor 之后就不需要再從外部輸入數據了(fit的時候)。
而使用這種方式訓練得到的的模型文件為h5,如果想要轉換成用於tensorflow service的模型,需要用以下方式進行:a). fchollet提供的keras式寫法
# @1
from keras import backend as K
K.set_learning_phase(0) # all new operations will be in test mode from now on
# serialize the model and get its weights, for quick re-building
config = previous_model.get_config()
weights = previous_model.get_weights()
# re-build a model where the learning phase is now hard-coded to 0
from keras.models import model_from_config
new_model = model_from_config(config)
new_model.set_weights(weights)
# @2
from tensorflow_serving.session_bundle import exporter
export_path = ... # where to save the exported graph
export_version = ... # version number (integer)
saver = tf.train.Saver(sharded=True)
model_exporter = exporter.Exporter(saver)
signature = exporter.classification_signature(input_tensor=model.input,
scores_tensor=model.output) # 分類
model_exporter.init(sess.graph.as_graph_def(),
default_graph_signature=signature)
model_exporter.export(export_path, tf.constant(export_version), sess)
或者tensorflow官方的寫法:
import os
import tensorflow as tf
import keras.backend as K
def save_model_for_production(model, version, path='prod_models'):
K.set_learning_phase(0)
if not os.path.exists(path):
os.mkdir(path)
export_path = os.path.join(
tf.compat.as_bytes(path),
tf.compat.as_bytes(str(get_new_version(path=path, current_version=int(version)))))
builder = tf.saved_model.builder.SavedModelBuilder(export_path)
model_input = tf.saved_model.utils.build_tensor_info(model.input)
model_output = tf.saved_model.utils.build_tensor_info(model.output)
prediction_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={'inputs': model_input},
outputs={'output': model_output},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))
with K.get_session() as sess:
builder.add_meta_graph_and_variables(
sess=sess, tags=[tf.saved_model.tag_constants.SERVING],
signature_def_map={
'predict':
prediction_signature,
})
builder.save()
參考
- fchollet的mnist example
- https://stackoverflow.com/questions/46135499/how-to-properly-combine-tensorflows-dataset-api-and-keras
- https://blog.keras.io/keras-as-a-simplified-interface-to-tensorflow-tutorial.html
3. Dataset + estimator
可以用tf.layers
函數來代替keras搭建網絡,而且可以提供更豐富的layer。
def xception():
def tf_xception(features, classes=2, is_training=True):
"""
The Xception architecture written in tf.layers
Args:
features: input image tensor
classes: number of classes to classify images into
is_training: is training stage or not
Returns:
2-D logits prediction output after pooling and activation
"""
x = tf.layers.conv2d(features, 32, (3, 3), strides=(2, 2), use_bias=False, name='block1_conv1')
x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv1_bn')
x = tf.nn.relu(x, name='block1_conv1_act')
x = tf.layers.conv2d(x, 64, (3, 3), use_bias=False, name='block1_conv2')
x = tf.layers.batch_normalization(x, training=is_training, name='block1_conv2_bn')
x = tf.nn.relu(x, name='block1_conv2_act')
residual = tf.layers.conv2d(x, 128, (1, 1), strides=(2, 2), padding='same', use_bias=False)
residual = tf.layers.batch_normalization(residual, training=is_training)
x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv1')
x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv1_bn')
x = tf.nn.relu(x, name='block2_sepconv2_act')
x = tf.layers.separable_conv2d(x, 128, (3, 3), padding='same', use_bias=False, name='block2_sepconv2')
x = tf.layers.batch_normalization(x, training=is_training, name='block2_sepconv2_bn')
x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block2_pool')
x = tf.add(x, residual, name='block2_add')
residual = tf.layers.conv2d(x, 256, (1, 1), strides=(2, 2), padding='same', use_bias=False)
residual = tf.layers.batch_normalization(residual, training=is_training)
x = tf.nn.relu(x, name='block3_sepconv1_act')
x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv1')
x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv1_bn')
x = tf.nn.relu(x, name='block3_sepconv2_act')
x = tf.layers.separable_conv2d(x, 256, (3, 3), padding='same', use_bias=False, name='block3_sepconv2')
x = tf.layers.batch_normalization(x, training=is_training, name='block3_sepconv2_bn')
x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block3_pool')
x = tf.add(x, residual, name="block3_add")
residual = tf.layers.conv2d(x, 728, (1, 1), strides=(2, 2), padding='same', use_bias=False)
residual = tf.layers.batch_normalization(residual, training=is_training)
x = tf.nn.relu(x, name='block4_sepconv1_act')
x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv1')
x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv1_bn')
x = tf.nn.relu(x, name='block4_sepconv2_act')
x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block4_sepconv2')
x = tf.layers.batch_normalization(x, training=is_training, name='block4_sepconv2_bn')
x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block4_pool')
x = tf.add(x, residual, name="block4_add")
for i in range(8):
residual = x
prefix = 'block' + str(i + 5)
x = tf.nn.relu(x, name=prefix + '_sepconv1_act')
x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv1')
x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv1_bn')
x = tf.nn.relu(x, name=prefix + '_sepconv2_act')
x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv2')
x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv2_bn')
x = tf.nn.relu(x, name=prefix + '_sepconv3_act')
x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name=prefix + '_sepconv3')
x = tf.layers.batch_normalization(x, training=is_training, name=prefix + '_sepconv3_bn')
x = tf.add(x, residual, name=prefix+"_add")
residual = tf.layers.conv2d(x, 1024, (1, 1), strides=(2, 2), padding='same', use_bias=False)
residual = tf.layers.batch_normalization(residual, training=is_training)
x = tf.nn.relu(x, name='block13_sepconv1_act')
x = tf.layers.separable_conv2d(x, 728, (3, 3), padding='same', use_bias=False, name='block13_sepconv1')
x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv1_bn')
x = tf.nn.relu(x, name='block13_sepconv2_act')
x = tf.layers.separable_conv2d(x, 1024, (3, 3), padding='same', use_bias=False, name='block13_sepconv2')
x = tf.layers.batch_normalization(x, training=is_training, name='block13_sepconv2_bn')
x = tf.layers.max_pooling2d(x, (3, 3), strides=(2, 2), padding='same', name='block13_pool')
x = tf.add(x, residual, name="block13_add")
x = tf.layers.separable_conv2d(x, 1536, (3, 3), padding='same', use_bias=False, name='block14_sepconv1')
x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv1_bn')
x = tf.nn.relu(x, name='block14_sepconv1_act')
x = tf.layers.separable_conv2d(x, 2048, (3, 3), padding='same', use_bias=False, name='block14_sepconv2')
x = tf.layers.batch_normalization(x, training=is_training, name='block14_sepconv2_bn')
x = tf.nn.relu(x, name='block14_sepconv2_act')
# replace conv layer with fc
x = tf.layers.average_pooling2d(x, (3, 3), (2, 2), name="global_average_pooling")
x = tf.layers.conv2d(x, 2048, [1, 1], activation=None, name="block15_conv1")
x = tf.layers.conv2d(x, classes, [1, 1], activation=None, name="block15_conv2")
x = tf.squeeze(x, axis=[1, 2], name="logits")
return x
def model_fn(features, labels, mode, params):
"""
Model_fn for estimator model
Args:
features (Tensor): Input features to the model.
labels (Tensor): Labels tensor for training and evaluation.
mode (ModeKeys): Specifies if training, evaluation or prediction.
params (HParams): hyper-parameters for estimator model
Returns:
(EstimatorSpec): Model to be run by Estimator.
"""
# check if training stage
if mode == tf.estimator.ModeKeys.TRAIN:
is_training = True
else:
is_training = False
# is_training = False # 1
input_tensor = features["input"]
logits = xception(input_tensor, classes=_NUM_CLASSES, is_training=is_training)
probs = tf.nn.softmax(logits, name="output_score")
predictions = tf.argmax(probs, axis=-1, name="output_label")
onehot_labels = tf.one_hot(tf.cast(labels, tf.int32), _NUM_CLASSES)
# provide a tf.estimator spec for PREDICT
predictions_dict = {"score": probs,
"label": predictions}
if mode == tf.estimator.ModeKeys.PREDICT:
predictions_output = tf.estimator.export.PredictOutput(predictions_dict)
return tf.estimator.EstimatorSpec(mode=mode,
predictions=predictions_dict,
export_outputs={
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: predictions_output
})
# calculate loss
# loss = focal_loss(onehot_labels, logits, gamma=1.5)
gamma = 1.5
weights = tf.reduce_sum(tf.multiply(onehot_labels, tf.pow(1. - probs, gamma)), axis=-1)
loss = tf.losses.softmax_cross_entropy(onehot_labels, logits, weights=weights)
accuracy = tf.metrics.accuracy(labels=labels,
predictions=predictions)
if mode == tf.estimator.ModeKeys.TRAIN:
lr = params.learning_rate
# train optimizer
optimizer = tf.train.RMSPropOptimizer(learning_rate=lr, decay=0.9)
update_ops = tf.get_collections(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
tensors_to_log = {'batch_accuracy': accuracy[1],
'logits': logits,
'label': labels}
logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=1000)
return tf.estimator.EstimatorSpec(mode=mode,
loss=loss,
train_op=train_op,
training_hooks=[logging_hook])
else:
eval_metric_ops = {"accuracy": accuracy}
return tf.estimator.EstimatorSpec(mode=mode,
loss=loss,
eval_metric_ops=eval_metric_ops)
def get_estimator_model(config=None, params=None):
"""
Get estimator model by definition of model_fn
"""
est_model = tf.estimator.Estimator(model_fn=model_fn,
config=config,
params=params)
return est_model
def train(source_dir, model_save_path):
"""
Train patch based model
Args:
source_dir: a directory where training tfrecords file stored. All TF records start with train will be used!
model_save_path: weights save path
"""
if tf.gfile.Exists(source_dir):
train_data_paths = tf.gfile.Glob(source_dir+"/train*tfrecord")
val_data_paths = tf.gfile.Glob(source_dir+"/val*tfrecord")
if not len(train_data_paths):
raise Exception("[Train Error]: unable to find train*.tfrecord file")
if not len(val_data_paths):
raise Exception("[Eval Error]: unable to find val*.tfrecord file")
else:
raise Exception("[Train Error]: unable to find input directory!")
train_config = tf.estimator.RunConfig()
new_config = train_config.replace(model_dir=model_save_path,
save_checkpoints_steps=1000,
keep_checkpoint_max=5)
params = tf.contrib.training.HParams(
learning_rate=0.001,
train_steps=5000,
min_eval_frequency=1000
)
est_model = get_estimator_model(config=new_config,
params=params)
# define training config
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(data_path=train_data_paths,
batch_size=_BATCH_SIZE,
is_training=True),
max_steps=_MAX_STEPS)
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(data_path=val_data_paths,
batch_size=_BATCH_SIZE),
steps=100,
throttle_secs=900)
# train and evaluate model
tf.estimator.train_and_evaluate(estimator=est_model,
train_spec=train_spec,
eval_spec=eval_spec)
def evaluate(source_dir, model_save_path):
"""
Eval patch based model
Args:
source_dir: directory where val tf records file stored. All TF records start with val will be used!
model_save_path: model save path
"""
# load model
run_config = tf.contrib.learn.RunConfig(model_dir=model_save_path)
est_model = get_estimator_model(run_config)
# load test dataset
if tf.gfile.Exists(source_dir):
val_data_paths = tf.gfile.Glob(source_dir+"/val*.tfrecord")
if not len(val_data_paths):
raise Exception("[Eval Error]: unable to find val*.tfrecord file")
else:
raise Exception("[Train Error]: unable to find input directory!")
accuracy_score = est_model.evaluate(input_fn=lambda: input_fn(val_data_paths,
batch_size=_BATCH_SIZE,
is_training=False))
print("Accuracy of testing images: {}".format(accuracy_score))
注:
- BN layer in estimator: :
對於問題#1,在包含BN層的網絡,正常情況下都是需要明確指定是否處於training狀態(BN在訓練和其他狀態的計算方式是不同的),但是在 1.4.1 版本下,使用estimator進行訓練的時候是正常(這個錯誤是由於Tensorflowis_training=True
),但是在evaluation的時候(is_training=False
)輸出的精度是初始化狀態,並且隨着train_and_evaluate()
的多次調用,eval loss會變得很大。這應該是一個bug(具體問題可查看issue13895), 16455。而參考tensorflow官方給出的給予estimator的resnet example中的BN layer在train stage的時候並沒有設置is_training=True
,所以這里把is_training關閉了,這樣操作訓練是正常的。tf.layers.batch_normalization()
函數調用方式不對導致的,根據這個函數的[官方文檔](https://www.tensorflow.org/api_docs/python/tf/layers/batch_normalization):
Note: when training, the moving_mean and moving_variance need to be updated. By default the update ops are placed in tf.GraphKeys.UPDATE_OPS, so they need to be added as a dependency to the train_op.
需要明確手動添加update_ops作為dependency到update_ops:
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
train_op = optimizer.minimize(loss)
此外,輸入的圖片的channel需要為3
- TensorHook for logging: 在使用 tensorhook 打印訓練日志的時候需要明確指
tf.logging
的verbosity level:tf.logging.set_verbosity(tf.logging.INFO)
- estimator自定義model_fn的時候需要根據model_fn的使用狀態返回不同的EstimatorSpec,model_fn的定義需要符合以下格式,具體可以查看官方文檔:
def model_fn(features, labels, mode, params):
# Logic to do the following:
# 1. Configure the model via TensorFlow operations
# 2. Define the loss function for training/evaluation
# 3. Define the training operation/optimizer
# 4. Generate predictions
# 5. Return predictions/loss/train_op/eval_metric_ops in EstimatorSpec object
return EstimatorSpec(mode, predictions, loss, train_op, eval_metric_ops)
model_fn有三種mode:tf.estimator.ModeKeys.TRAIN
,tf.estimator.ModeKeys.EVAL
, tf.estimator.ModeKeys.PREDICT
,其中第一個必須要返回的為loss
和training_op
, 第二個是loss
,第三個是prediction
。
- tf.metrics.accuracy: 一般情況下EVAL的時候model_fn會返回
eval_metric_ops
,這個是一個dict,其值為tf.metrics
的返回值(以tf.metrics.accuracy()
為例,返回值如下):
accuracy: A
Tensor
representing the accuracy, the value oftotal
divided bycount
.
update_op: An operation that increments thetotal
andcount
variables appropriately and whose value matchesaccuracy
.
但是這個函數在estimator模型中使用的時候有點反直覺:
tf.metrics.accuracy is not meant to compute the accuracy of a single batch. It returns both the accuracy and an update_op, and update_op is intended to be run every batch, which updates the accuracy.
所以在上面代碼中對每個batch算accuracy的時候算的是update_op而非accuracy,更多關於tf.metrics.accuracy()
的討論可以看這倆個issue:15115, 9498
Export saved model
Freeze model/export savedmodel for Tensorflow Serving
使用estimator進行訓練的一大好處就是進行分布式部署的擴展很容易,而部署需要的模型結構為(具體看這里):
從訓練的ckpt模型文件freeze從pb(protobuf)模型文件,estimator提供了export_savedmodel
來幫助快速進行操作。
def serving_input_receiver_fn():
"""
Build serving inputs
"""
inputs = tf.placeholder(dtype=tf.string, name="input_image")
feature_config = {'image/encoded': tf.FixedLenFeature(shape=[], dtype=tf.string)}
tf_example = tf.parse_example(inputs, feature_config)
patch_images = tf.map_fn(_preprocess_image, tf_example["image/encoded"], dtype=tf.float32)
patch_images = tf.squeeze(patch_images, axis=[0])
receive_tensors = {'example': inputs}
features = {"input": patch_images}
return tf.estimator.export.ServingInputReceiver(features, receive_tensors)
def save_serving_model():
session_config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=True))
config = tf.estimator.RunConfig(model_dir=MODEL_WEIGHTS_PATH, session_config=session_config) # session_config is used for configuration of session
model = get_estimator_model(config=config)
model.export_savedmodel(export_dir_base=SERVING_MODEL_SAVE_PATH, serving_input_receiver_fn=serving_input_receiver_fn)
如果需要在預測的時候調整sess的參數(ConfigProto),可以通過tf.estimator.RunConfig
配置session_config
的參數然后再輸入到tf.estimator.Estimator()
的config參數(如上面所示)。
predict
一般通過export_savedmodel方式輸出的模型是用tensorflow serving封裝服務進行預測:
# start tensorflow serving
$ bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server --port=9000 --model_name=your_model_name --model_base_path=/your/model/base/path
server = "localhost:9000"
def serving_predict(face_img):
"""
Get prediction based on given face image
Args:
face_img: input image file
Returns:
anti-spoof detection result of given face
"""
host, port = server.split(":")
channel = implementations.insecure_channel(host, int(port))
stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
# creating request
request = predict_pb2.PredictRequest()
request.model_spec.name = "face_anti_spoof"
request.model_spec.signature_name = "predict_images"
# parse face img for serving
with open(face_img, "rb") as f:
serialized = f.read()
example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(serialized)}))
serialized = example.SerializeToString()
request.inputs['inputs'].CopyFrom(tf.contrib.util.make_tensor_proto(serialized, shape=[1]))
# predict results
content = stub.Predict(request, timeout=5)
labels = content.outputs["label"].int64_val
prob = np.sum(labels) / len(labels)
if prob > 0.5:
label = LABELS[1]
else:
label = LABELS[0]
prob = 1 - prob
info = {"label": label, "prob": prob}
return info
具體關於tensorlfow serving的用法會在之后的文章里面展開。
而生成的pb文件也可以直接加載不通過tensorflow serving來進行預測,不過由於estimator的export_savedmodel
輸出的pb文件中模型的保存格式為tf.saved_model
類,所以加載模型用tf.saved_model.loader
模塊進行操作會比較方便(sess.restore()
加載這個pb模型很容易報錯= =):
def load_graph(trained_model):
session_config = tf.ConfigProto()
session_config.gpu_options.allow_growth = True
sess = tf.Session(config=session_config)
meta_graph_def = tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.SERVING], trained_model)
# signature = meta_graph_def.signature_def
# signature_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
# print(signature[signature_key])
return sess
sess = load_graph(model_path)
def predict_pb(image):
"""
Predict input face is real or not using model stored in save_model.pb generated by estimator export_savedmodel
Args:
image: input image file
"""
assert tf.gfile.Exists(image), "Input image file not found!"
with open(image, "rb") as f:
image_bytes = f.read()
example = tf.train.Example(features=tf.train.Features(feature={"image/encoded": bytes_feature(image_bytes)}))
serialized = example.SerializeToString()
input_tensor = sess.graph.get_tensor_by_name("input_image:0")
labels = sess.graph.get_tensor_by_name("output_label:0")
scores = sess.graph.get_tensor_by_name("output_score:0")
(labels, scores) = sess.run([labels, scores], feed_dict={input_tensor: [serialized]})
return labels, scores
參考
- estimator tutorial
- https://www.tensorflow.org/versions/r1.5/get_started/custom_estimators
- https://developers.googleblog.com/2017/12/creating-custom-estimators-in-tensorflow.html
- https://www.tensorflow.org/extend/estimators
- https://www.damienpontifex.com/2017/09/19/mnist-with-tensorflow-experiments-and-estimators/
- estimator example
- https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py
- https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/learn/resnet.py
- https://github.com/tensorflow/models/blob/master/samples/outreach/blogs/blog_custom_estimators.py
- https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/learn/iris_custom_model.py
- discussion
- https://github.com/tensorflow/tensorflow/issues/1122
- https://github.com/tensorflow/tensorflow/issues/15115
- https://github.com/tensorflow/tensorflow/issues/9498
- tf.layers
- BN layer
- export & load saved model and predict
- https://stackoverflow.com/questions/43667018/locally-load-saved-tensorflow-model-pb-from-google-cloud-machine-learning-engin
- https://stackoverflow.com/questions/33759623/tensorflow-how-to-save-restore-a-model
- https://stackoverflow.com/questions/46513923/tensorflow-how-and-why-to-use-savedmodel
- https://stackoverflow.com/questions/46098863/how-to-import-an-saved-tensorflow-model-train-using-tf-estimator-and-predict-on
- https://www.tensorflow.org/api_docs/python/tf/saved_model/loader
- https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/saved_model/README.md
- https://github.com/tensorflow/serving/blob/master/tensorflow_serving/example/inception_saved_model.py
- https://github.com/tensorflow/serving/issues/478
- estimator gpu options setting
- https://github.com/tensorflow/tensorflow/issues/9831
- https://github.com/tensorflow/tensorflow/issues/13458
4. keras + estimator + dataset
大部分與上面相同,只是用keras.estimator.model_to_estimator()
函數直接將keras model直接轉換成estimator model而不用自己定義model_fn,這種方式對於模型訓練十分簡單友好,但是在使用Tensorflow estimator的export_savedmodel()
函數的時候會報錯,原因是缺少export_outputs
(在自定義model_fn的時候好像也並沒有需要明確定義export_outputs變量,非常微妙的報錯),而用官方的做法轉化模型的時候又會報 BN層參數沒有初始化 的錯誤(Attempting to use uninitialized value bn_xx)(又是BN層, 跟之前一樣可能是由於graph重用引起的錯誤),因此用這種方式訓練的模型暫時還沒有找到轉化成Tensorflow serving所需的模型的方法。
def model_fn(model_dir=".", input_shape=(_PATCH_SIZE, _PATCH_SIZE, 3)):
"""
Convert keras model to estimator model
Args:
model_dir: absolute model save directory
input_shape: input image shape (not include batch)
Returns: an estimator model instance
"""
with K.name_scope("model"):
base_model = tf.keras.applications.Xception(include_top=True,
weights=None, # no pre-trained weights used
pooling="max",
input_shape=input_shape, # modify first layer
classes=_NUM_CLASSES)
base_model.summary()
model_input = keras.Input(shape=input_shape, dtype=tf.float32, name="input_1")
model_output = base_model(model_input)
model = keras.models.Model(inputs=model_input, outputs=model_output)
optimizer = keras.optimizers.RMSprop(lr=2e-3, decay=0.9)
model.compile(optimizer=optimizer,
loss=focal_loss,
metrics=['accuracy'])
if model_dir == ".":
model_dir = os.path.dirname(os.path.abspath(__file__))
elif not model_dir.startswith("/"): # relative path to abs path
model_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), model_dir)
if not os.path.exists(model_dir):
os.makedirs(model_dir)
# define training config
run_config = tf.estimator.RunConfig(save_checkpoints_steps=2000,
keep_checkpoint_max=5)
# convert keras model to estimator model
est_model = keras.estimator.model_to_estimator(model, model_dir=model_dir, config=run_config)
return est_model
參考
- https://www.dlology.com/blog/an-easy-guide-to-build-new-tensorflow-datasets-and-estimator-with-keras-model/
- https://cloud.google.com/blog/big-data/2017/12/new-in-tensorflow-14-converting-a-keras-model-to-a-tensorflow-estimator
- https://github.com/tensorflow/tensorflow/issues/14198
5. estimator + experiment + datasets
在1.4中estimator model有train_and_evaluate()
函數進行開始模型的train和evaluate,也可以直接用model.train()
和model.evaluate()
函數(類似keras model的fit和evaluate函數);不過可以用tf.contrib.learn.Experiment()
來代替(需要定義experiment類,比較麻煩)。大部分與estimator model相同,需要定義experiment_fn
如下:
from tensorflow.contrib.learn import learn_runner
def experiment_fn(run_config, params):
"""
Create an experiment to train and evaluate the model
Args:
run_config: configuration for estimator
params: Hparams object returned by tf.contrib.training.HParams
Returns:
Experiment object
"""
if tf.gfile.Exists(FLAGS.train_data_dir) and tf.gfile.Exists(FLAGS.val_data_dir):
train_data_paths = tf.gfile.Glob(FLAGS.train_data_dir+"/train*tfrecord")
val_data_paths = tf.gfile.Glob(FLAGS.val_data_dir+"/val*tfrecord")
if not len(train_data_paths):
raise Exception("[Train Error]: unable to find train*.tfrecord file")
if not len(val_data_paths):
raise Exception("[Eval Error]: unable to find val*.tfrecord file")
else:
raise Exception("[Train Error]: unable to find input directory!")
estimator = get_estimator_model(config=run_config, params=params)
def train_input_fn():
return input_fn(data_path=train_data_paths, batch_size=_BATCH_SIZE, is_training=True)
def eval_input_fn():
return input_fn(data_path=val_data_paths, batch_size=_BATCH_SIZE, is_training=False)
experiment = tf.contrib.learn.Experiment(
estimator=estimator,
train_input_fn=train_input_fn,
eval_input_fn=eval_input_fn,
train_steps=params.train_steps,
min_eval_frequency=params.min_eval_frequency,
)
return experiment
def run_experiment():
"""
Run the training experiment
"""
params = tf.contrib.training.HParams(
learning_rate=0.002,
n_classes=_NUM_CLASSES,
train_steps=5000,
min_eval_frequency=100
)
run_config = tf.contrib.learn.RunConfig(model_dir=FLAGS.save_dir,
save_checkpoints_steps=1000,
keep_checkpoint_max=5)
experiment = experiment_fn(run_config=run_config, params=params)
experiment.train_and_evaluate()
# learn_runner.run(
# experiment_fn=experiment_fn, # first class function
# run_config=run_config,
# schedule='train_and_evaluate',
# hparams=params
# )