TensorFlow 高階API
Dataset(tf.data)
源碼定義:tensorflow/python/data/ops/dataset_ops.py
用戶向導:Dataset Input Pipeline
Dataset可以用來表示輸入管道元素集合(張量的嵌套結構)和“邏輯計划“對這些元素的轉換操作。在Dataset中元素可以是向量,元組或字典等形式。
另外,Dataset需要配合另外一個類Iterator進行使用,Iterator對象是一個迭代器,可以對Dataset中的元素進行迭代提取。
| api | 說明 |
|---|---|
| from_generator(generator,output_types,output_shapes=None) | Creates a Dataset whose elements are generated by generator. |
| from_tensor_slices(tensors) | 創建dataset,其元素是給定張量的切片的元素 |
| from_tensors(tensors) | 創建一個Dataset包含給定張量的單個元素。 |
| interleave(map_func,cycle_length,block_length=1) | 對數據集進行map轉換並合並插入結果 |
| list_files(file_pattern,shuffle=None) | A dataset of all files matching a pattern. |
| range(*args) | Creates a Dataset of a step-separated range of values. |
| skip(count) | Creates a Dataset that skips count elements from this dataset. |
| take(count) | Creates a Dataset with at most count elements from this dataset. |
| zip(datasets) | Creates a Dataset by zipping together the given datasets. |
| prefetch(buffer_size) | 創建dataset,在請求輸入數據集之前從輸入數據集中預提取元素 |
| apply(transformation_func) | Apply a transformation function to this dataset. |
| cache(filename='') | Caches the elements in this dataset. |
| concatenate(dataset) | Creates a Dataset by concatenating given dataset with this dataset. |
| filter(predicate) | Filters this dataset according to predicate. |
| map(map_func,num_parallel_calls=None) | Maps map_func across this dataset. |
| flat_map(map_func) | Maps map_func across this dataset and flattens the result. |
| batch(batch_size) | 將數據集的連續元素合成批次 |
| padded_batch(batch_size,padded_shapes,padding_values=None) | 將數據集的連續元素組合到填充批次中,此轉換將輸入數據集的多個連續元素組合為單個元素。 |
| repeat(count=None) | Repeats this dataset count times. |
| shard(num_shards,index) | 將Dataset分割成num_shards個子數據集。這個函數在分布式訓練中非常有用,它允許每個設備讀取唯一子集。 |
| shuffle(buffer_size,seed=None,reshuffle_each_iteration=None) | 隨機混洗數據集的元素。 |
| make_initializable_iterator(shared_name=None) | Creates an Iterator for enumerating the elements of this dataset. |
| make_one_shot_iterator() | 創建Iterator用於枚舉此數據集的元素。(可自動初始化) |
Iterator
源碼定義:tensorflow/python/data/ops/iterator_ops.py
用戶向導:Dataset Input Pipeline > Iterating over datasets
| api | 說明 |
|---|---|
| get_next(name=None) | In graph mode, you should typically call this method once and use its result as the input to another computation. |
Estimator(tf.estimator)
源碼定義:tensorflow/python/estimator/estimator.py
用戶向導:Regression Examples
Estimator對象包裝(wraps)由model_fn指定的模型,model_fn由給定輸入和其他一些參數,返回需要進行訓練、計算,或預測的操作.
所有輸出(檢查點,事件文件等)都被寫入model_dir或其子目錄.如果model_dir未設置,則使用臨時目錄.
可以通過RunConfig對象(包含了有關執行環境的信息)傳遞config參數.它被傳遞給model_fn,如果model_fn有一個名為“config”的參數(和輸入函數以相同的方式).Estimator使得模型可配置化,並且還使用其一些字段來控制內部,特別是關於檢查點的控制.
該params參數包含hyperparameter,如果model_fn有一個名為“PARAMS”的參數,並且以相同的方式傳遞給輸入函數,則將它傳遞給model_fn. Estimator只是沿着參數傳遞,並不檢查它.因此,params的結構完全取決於開發人員.
不能在子類中重寫任何Estimator方法(其構造函數強制執行此操作).子類應使用model_fn來配置基類,並且可以添加實現專門功能的方法.
| api | 說明 |
|---|---|
| init(model_fn, model_dir=None, config=None, params=None) | Estimator的構造方法,返回EstimatorSpec實例 |
| evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None) | 返回一個包含按name為鍵的model_fn中指定的計算指標的詞典。 |
| export_savedmodel(export_dir_base, serving_input_receiver_fn) | 將推理圖作為SavedModel導出到給定的目錄中.該方法通過首先調用serving_input_receiver_fn來獲取特征Tensors來構建一個新圖,然后調用這個Estimator的model_fn來基於這些特征生成模型圖.它在新的會話中將給定的檢查點恢復到該圖中.最后它會在給定的export_dir_base下面創建一個時間戳導出目錄,並在其中寫入一個SavedModel,其中包含從此會話保存的單個MetaGraphDef. |
| get_variable_names() | 返回此模型中所有變量名稱的列表. |
| get_variable_value(name) | 返回由名稱給出的變量的值. 返回值類型:Numpy數組 |
| latest_checkpoint() | 查找model_dir中最新保存的檢查點文件的文件名. |
| predict(self,input_fn,predict_keys=None) | 對給定的features產生預測,返回predictions字典張量的計算值. |
| train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None) | 訓練給定訓練數據input_fn的模型. |
FeatureColumns(tf.feature_column)
源碼定義:tensorflow/python/feature_column/feature_column.py
用戶向導:feature_columns
feature_columns2
Feature cloumns是原始數據和Estimator模型之間的橋梁,它們被用來把各種形式的原始數據轉換為模型能夠使用的格式
| api | 返回值 | 用法 |
|---|---|---|
| tf.feature_column.bucketized_column(source_column,boundaries) | Represents discretized dense input. | Bucketized column用來把numeric column的值按照提供的邊界(boundaries)離散化為多個值 |
| tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string) | Represents sparse feature where ids are set by hashing. | 用戶指定類別的總數,通過hash的方式來得到最終的類別ID |
| tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None) | A _CategoricalColumn that returns identity values. | 與Bucketized column類似,Categorical identity column用單個唯一值表示bucket。 |
| tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file) | A _CategoricalColumn with a vocabulary file. | 類別特征做one-hot編碼,字典通過文件給出 |
| tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list) | A _CategoricalColumn with in-memory vocabulary. | 類別特征做one-hot編碼,字典通過list給出 |
| tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None) | Returns a column for performing crosses of categorical features. | 對keys的笛卡爾積執行hash操作,再把hash的結果對hash_bucket_size取模得到最終的結果 |
| tf.feature_column.embedding_column(categorical_column,dimension,combiner='mean') | _DenseColumn that converts from sparse, categorical input. | 把原始特征映射為一個低維稠密的實數向量 |
| tf.feature_column.indicator_column(categorical_column) | Represents multi-hot representation of given categorical column. | 把categorical column得到的稀疏tensor轉換為one-hot或者multi-hot形式的稠密tensor |
| tf.feature_column.input_layer(features,feature_columns) | Returns a dense Tensor as input layer based on given feature_columns. | |
| tf.feature_column.linear_model(features,feature_columns) | Returns a linear prediction Tensor based on given feature_columns. | |
| tf.feature_column.make_parse_example_spec(feature_columns) | Creates parsing spec dictionary from input feature_columns. | |
| tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None) | Represents real valued or numerical features. | |
| tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner='mean') | List of dense columns that convert from sparse, categorical input. | 把多個特征共享相同的embeding映射空間 |
| tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32) | 給一個類別特征賦予一定的權重,給一個類別特征賦予一定的權重 |
tf.nn
激活函數op,loss部分抽象
源碼定義:tensorflow/python/ops/gen_nn_ops.py
用戶向導:Neural Network
| api | 說明 |
|---|---|
| tf.nn.relu(...) | Computes rectified linear: max(features, 0). |
| tf.nn.sigmoid(...) | Computes sigmoid of x element-wise. |
| tf.nn.bias_add(...) | |
| tf.nn.tanh(...) | Computes hyperbolic tangent of x element-wise. |
| tf.nn.dropout() | |
| tf.nn.softmax() | |
| tf.nn.sigmoid_cross_entropy_with_logits(...) | Computes sigmoid cross entropy given logits. |
| tf.nn.top_k(...) | Finds values and indices of the k largest entries for the last dimension. |
| tf.nn.embedding_lookup(params,ids,partition_strategy='mod') | 按照ids查找params里面的vector然后輸出 |
| tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights) | Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights. |
| tf.nn.zero_fraction(value,name=None) | Returns the fraction of zeros in value. |
| tf.nn.nce_loss() | 負采樣的NCE loss實現 |
tf.layers
網絡模塊抽象
| api | 說明 |
|---|---|
| average_pooling1d(...) | Average Pooling layer for 1D inputs. |
| average_pooling2d(...) | Average pooling layer for 2D inputs (e.g. images). |
| average_pooling3d(...) | Average pooling layer for 3D inputs (e.g. volumes). |
| batch_normalization(...) | Functional interface for the batch normalization layer. |
| conv1d(...) | Functional interface for 1D convolution layer (e.g. temporal convolution). |
| conv2d(...) | Functional interface for the 2D convolution layer. |
| conv2d_transpose(...) | Functional interface for transposed 2D convolution layer. |
| conv3d(...) | Functional interface for the 3D convolution layer. |
| conv3d_transpose(...) | Functional interface for transposed 3D convolution layer. |
| dense(...) | Functional interface for the densely-connected layer. |
| dropout(...) | Applies Dropout to the input. |
| flatten(...) | Flattens an input tensor while preserving the batch axis (axis 0). |
| max_pooling1d(...) | Max Pooling layer for 1D inputs. |
| max_pooling2d(...) | Max pooling layer for 2D inputs (e.g. images). |
| max_pooling3d(...) | Max pooling layer for 3D inputs (e.g. volumes). |
| separable_conv1d(...) | Functional interface for the depthwise separable 1D convolution layer. |
| separable_conv2d(...) | Functional interface for the depthwise separable 2D convolution layer. |
tf.train
tf.train provides a set of classes and functions that help train models.
用戶向導:Training
| api | 說明 |
|---|---|
| tf.train.Optimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
| tf.train.GradientDescentOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
| tf.train.AdadeltaOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
| tf.train.AdagradOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
| tf.train.AdamOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
| tf.train.SyncReplicasOptimizer | The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables. |
| tf.train.SessionRunHook | Training Hooks,Hook to extend calls to MonitoredSession.run(). |
| tf.train.SessionRunArgs | Training Hooks,Represents arguments to be added to a Session.run() call. |
| tf.train.global_step | sess每執行完一個batch,就給global_step加1,用來統計目前執行的batch數 |
| tf.train.basic_train_loop | |
| tf.train.get_global_step() | 返回global_step作為name的tensor |
| tf.train.get_or_create_global_step() | Returns and create (if necessary) the global step tensor. |
| tf.train.write_graph | |
| tf.train.string_input_producer | 輸出字符串到一個輸入管道隊列 |
| tf.train.Saver(max_to_keep=5) | 保存模型 |
| tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None) | Finds the filename of latest saved checkpoint file. |
| tf.train.Int64List | Int64List |
| tf.train.FloatList | FloatList |
| tf.train.Features | Features |
| tf.train.Example | Example |
| tf.train.batch(tensors,batch_size) | Creates batches of tensors in tensors. |
| tf.trainable_variables() | |
| tf.train.Scaffold | Scaffold |
tf.linalg
TensorFlow 的線性代數庫。
w3cschool TensorFlow官方文檔-linalg
| api | 說明 |
|---|---|
| adjoint(...) | 轉置最后兩個維度和共軛張量matrix. |
| band_part(...) | 復制張量設置每個最內層矩陣中心帶外的所有內容 |
| cholesky(...) | 計算一個或多個方陣的Cholesky分解. |
| cholesky_solve(...) | A X = RHS給出的Cholesky因子分解,求解線性方程組. |
| det(...) | 計算一個或多個方陣的行列式. |
| diag(...) | 返回具有給定批處理對角線值的批處理對角線張量. |
| diag_part(...) | 返回批處理張量的批處理對角線部分. |
| eigh(...) | 計算了一批自共軛矩陣的特征分解. |
| eigvalsh(...) | 計算一個或多個自共軛矩陣的特征值. |
| einsum(...) | 任意維度的張量之間的廣義收縮. |
| expm(...) | 計算一個或多個方陣的矩陣指數. |
| eye(...) | 構造一個單位矩陣或批矩陣. |
| inv(...) | 計算一個或多個平方可逆矩陣或它們的倒數 |
| logdet(...) | 計算hermitian正定矩陣的行列式的對數. |
| logm(...) | 計算一個或多個方陣的矩陣對數: |
| lstsq(...) | 解決一個或多個線性最小二乘問題. |
| norm(...) | 計算向量,矩陣和張量的范數.(不贊成的參數) |
| qr(...) | 計算一個或多個矩陣的QR分解. |
| set_diag(...) | 返回具有新批處理對角線值的批處理矩陣張量. |
| slogdet(...) | 計算行列式的絕對值的符號和日志 |
| solve(...) | 求解線性方程組. |
| svd(...) | 計算一個或多個矩陣的奇異值分解. |
| tensordot(...) | a和b沿指定軸的張量收縮. |
| trace(...) | 計算張量x的軌跡. |
| transpose(...) | 轉置張量a的最后兩個維度. |
| triangular_solve(...) | 求解具有上三角矩陣或下三角矩陣的線性方程組. |
checkpoint(模型保存與恢復)
Estimator 將一個檢查點保存到 model_dir 中后,每次調用 Estimator 的 train、eval 或 predict 方法時,都會發生下列情況:
a) Estimator 通過運行 model_fn() 構建模型圖。(要詳細了解 model_fn(),請參閱創建自定義 Estimator。)
b) Estimator 根據最近寫入的檢查點中存儲的數據來初始化新模型的權重。
換言之,一旦存在檢查點,TensorFlow 就會在您每次調用 train()、evaluate() 或 predict() 時重建模型。
Tensorflow Serving
官方例子
half_plus_two的例子
# Download the TensorFlow Serving Docker image and repo
docker pull tensorflow/serving
git clone https://github.com/tensorflow/serving
# Location of demo models
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"
# Start TensorFlow Serving container and open the REST API port
# docker run -t 表示是否允許偽TTY
# docker run --rm表示如果實例已經存在,則先remove掉該實例再重新啟動新實例
# docker -p設置端口映射
# docker -v設置磁盤映射
# docker -e設置環境變量
docker run -t --rm -p 8501:8501 \
-v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" \
-e MODEL_NAME=half_plus_two \
tensorflow/serving &
# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}' \
-X POST http://localhost:8501/v1/models/half_plus_two:predict
# Returns => { "predictions": [2.5, 3.0, 4.5] }
serving鏡像提供了兩種調用方式:gRPC和HTTP請求。gRPC默認端口是8500,HTTP請求的默認端口是8501,serving鏡像中的程序會自動加載鏡像內/models下的模型,通過MODEL_NAME指定/models下的哪個模型。
創建自定義鏡像
docker run --rm -p 8501:8501 \
--mount type=bind,source=${model_dir},target=/models/dpp_model \
-e MODEL_NAME=dpp_model -t tensorflow/serving &
架構
TensorFlow Serving可抽象為一些組件構成,每個組件實現了不同的API任務,其中最重要的是Servable, Loader, Source, 和 Manager,我們看一下組件之間是如何交互的。
Source
TF Serving發現磁盤上的模型文件,該模型服務的生命周期就開始了。Source組件負責發現模型文件,找出需要加載的新模型。實際上,該組件監控文件系統,當發現一個新的模型版本,就為該模型創建一個Loader。
Loader
Loader需要知道模型的相關信息,包括如何加載模型如何估算模型需要的資源,包括需要請求的RAM、GPU內存。Loader帶一個指針,連接到磁盤上存儲的模型,其中包含加載模型需要的相關元數據。不過記住,Loader現在還不允許加載模型。
Manager
Manager收到待加載模型版本,開始模型服務流程。此處有兩種可能性,第一種情況是模型首次推送部署,Manager先確保模型需要的資源可用,一旦獲取相應的資源,Manager賦予Loader權限去加載模型。
第二種情況是為已上線模型部署一個新版本。Manager會先查詢Version Policy插件,決定加載新模型的流程如何進行。
具體來說,當加載新模型時,可選擇保持 (1) 可用性(the Availability Preserving Policy)或 (2) 資源(the Resource Preserving Policy)。
Servable
最后,當用戶請求模型的句柄,Manager返回句柄給Servable。
Servables 是 TensorFlow Serving 中最核心的抽象,是客戶端用於執行計算 (例如:查找或推斷) 的底層對象。
部署服務
模型導出
將TensorFlow構建的模型用作服務,首先需要確保導出為正確的格式,可以采用TensorFlow提供的SavedModel類。TensorFlow Saver提供模型checkpoint磁盤文件的保存/恢復。事實上SavedModel封裝了TensorFlow Saver,對於模型服務是一種標准的導出方法。
SignatureDefs定義了一組TensorFlow支持的計算簽名,便於在計算圖中找到適合的輸入輸出張量。簡單的說,使用這些計算簽名,可以准確指定特定的輸入輸出節點。目前有3個服務API: 分類、預測和回歸。每個簽名定義關聯一個RPC API。分類SignatureDef用於分類RPC API,預測SignatureDef用於RPC API等等。對於分類SignatureDef,需要一個輸入張量(接收數據)以及可能的輸出張量: 類別或得分。回歸SignatureDef需要一個輸入張量以及另一個輸出張量。最后預測SignatureDef需要一個可變長度的輸入輸出張量。
tf.saved_model API
with sess.graph.as_default() as graph:
builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir)
# 1.8版本,1.12之后調整為tf.saved_model.predict_signature_def
signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image},
outputs={'prediction': graph.get_tensor_by_name('final_result:0')},)
builder.add_meta_graph_and_variables(sess=sess,
tags=["serve"],
signature_def_map={'predict':signature,
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature
})
builder.save()
tf.estimator API
def export_savedmodel(self,
export_dir,
serving_input_receiver_fn=None,
as_text=False,
checkpoint_path=None):
if serving_input_receiver_fn is None and self.feature_cfgs is None:
raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.')
if serving_input_receiver_fn is None:
feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
# 自己封裝的estimator
savedmodel_path = self.estimator.export_savedmodel(
export_dir_base=export_dir,
serving_input_receiver_fn=serving_input_receiver_fn,
as_text=as_text,
checkpoint_path=checkpoint_path
)
return savedmodel_path
其中self.feature_cfgs.to_feature_spec函數得到如下結果:
{'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}
說明了輸入】
TensorFlow Serving ModelServer 用於發現新導出的模型,並啟動 gRPC 用於提供模型服務。
API請求(predict)
TensorFlow ModelServer通過host:port接受下面這種RESTful API請求:
POST http://host:port/
URI: /v1/models/
\({MODEL_NAME}[/versions/\){MODEL_VERSION}]
VERB: classify|regress|predict
其中“/versions/${MODEL_VERSION}”是可選的,如果省略,則使用最新的版本。
該API基本遵循gRPC版本的PredictionService API。
請求URL的示例:
http://host:port/v1/models/iris:classify
http://host:port/v1/models/mnist/versions/314:predict
請求格式
預測API的請求體必須是如下格式的JSON對象:
{
// (Optional) Serving signature to use.
// If unspecifed default serving signature is used.
"signature_name": <string>,
// Input Tensors in row ("instances") or columnar ("inputs") format.
// A request can have either of them but NOT both.
"instances": <value>|<(nested)list>|<list-of-objects>
"inputs": <value>|<(nested)list>|<object>
}
以行的形式說明輸入的tensor
{
"instances": [
{
"tag": "foo",
"signal": [1, 2, 3, 4, 5],
"sensor": [[1, 2], [3, 4]]
},
{
"tag": "bar",
"signal": [3, 4, 1, 2, 5]],
"sensor": [[4, 5], [6, 8]]
}
]
}
以列的形式說明輸入的tensor
{
"inputs": {
"tag": ["foo", "bar"],
"signal": [[1, 2, 3, 4, 5], [3, 4, 1, 2, 5]],
"sensor": [[[1, 2], [3, 4]], [[4, 5], [6, 8]]]
}
}
返回格式
行形式
{
"predictions": <value>|<(nested)list>|<list-of-objects>
}
如果模型的輸出只包含一個命名的tensor,我們省略名字和predictions key map,直接使用標量或者值的list。如果模型輸出多個命名的tensor,我們輸出對象list,和上面提到的行形式輸入類似。
列形式
{
"outputs": <value>|<(nested)list>|<object>
}
如果模型的輸出只包含一個命名的tensor,我們省略名字和outputs key map,直接使用標量或者值的list。如果模型輸出多個命名的tensor,我們輸出對象,其每個key都和輸出的tensor名對應,和上面提到的列形式輸入類似。
API demo
$ curl -d '{"instances": [1.0,2.0,5.0]}' -X POST http://localhost:8501/v1/models/half_plus_three:predict
{
"predictions": [3.5, 4.0, 5.5]
}
[譯]TensorFlow Serving RESTful API
源碼理解
開發中遇到的其他問題記錄
實現numpy中的切片賦值
# 初始化一個3*4的tensor和大小為3的list
cis = tf.zeros([3, 4)
cis_list = [tf.zeros([4]) for _ in range(3)]
# 替換list的一個元素
cis_list[k] = eis
# 重新堆疊成一個tensor,列切片賦值axis=1?
cis = tf.stack(cis_list)
附錄
- tf.data.Dataset
- Tensorflow中的數據對象Dataset
- Dataset Input Pipeline
- 基於Tensorflow高階API構建大規模分布式深度學習模型系列之特征工程Feature Columns
- w3cschool TensorFlow官方文檔
- TensorFlow入門12 -- Checkpoints,保存和恢復Estimator創建的模型
- 官方文檔
- TensorFlow-Serving:Flexible, High-Performance ML Serving
- 如何用TF Serving部署TensorFlow模型
- TensorFlow 學習筆記
- Client API(REST)
- tensorflow-serving源碼理解
- docker部署tensorflow serving以及模型替換
