【推薦算法工程師技術棧系列】分布式&數據庫--tensorflow


TensorFlow 高階API

Dataset(tf.data)

源碼定義:tensorflow/python/data/ops/dataset_ops.py
用戶向導:Dataset Input Pipeline

Dataset可以用來表示輸入管道元素集合(張量的嵌套結構)和“邏輯計划“對這些元素的轉換操作。在Dataset中元素可以是向量,元組或字典等形式。
另外,Dataset需要配合另外一個類Iterator進行使用,Iterator對象是一個迭代器,可以對Dataset中的元素進行迭代提取。

api 說明
from_generator(generator,output_types,output_shapes=None) Creates a Dataset whose elements are generated by generator.
from_tensor_slices(tensors) 創建dataset,其元素是給定張量的切片的元素
from_tensors(tensors) 創建一個Dataset包含給定張量的單個元素。
interleave(map_func,cycle_length,block_length=1) 對數據集進行map轉換並合並插入結果
list_files(file_pattern,shuffle=None) A dataset of all files matching a pattern.
range(*args) Creates a Dataset of a step-separated range of values.
skip(count) Creates a Dataset that skips count elements from this dataset.
take(count) Creates a Dataset with at most count elements from this dataset.
zip(datasets) Creates a Dataset by zipping together the given datasets.
prefetch(buffer_size) 創建dataset,在請求輸入數據集之前從輸入數據集中預提取元素
apply(transformation_func) Apply a transformation function to this dataset.
cache(filename='') Caches the elements in this dataset.
concatenate(dataset) Creates a Dataset by concatenating given dataset with this dataset.
filter(predicate) Filters this dataset according to predicate.
map(map_func,num_parallel_calls=None) Maps map_func across this dataset.
flat_map(map_func) Maps map_func across this dataset and flattens the result.
batch(batch_size) 將數據集的連續元素合成批次
padded_batch(batch_size,padded_shapes,padding_values=None) 將數據集的連續元素組合到填充批次中,此轉換將輸入數據集的多個連續元素組合為單個元素。
repeat(count=None) Repeats this dataset count times.
shard(num_shards,index) 將Dataset分割成num_shards個子數據集。這個函數在分布式訓練中非常有用,它允許每個設備讀取唯一子集。
shuffle(buffer_size,seed=None,reshuffle_each_iteration=None) 隨機混洗數據集的元素。
make_initializable_iterator(shared_name=None) Creates an Iterator for enumerating the elements of this dataset.
make_one_shot_iterator() 創建Iterator用於枚舉此數據集的元素。(可自動初始化)

Iterator
源碼定義:tensorflow/python/data/ops/iterator_ops.py
用戶向導:Dataset Input Pipeline > Iterating over datasets

api 說明
get_next(name=None) In graph mode, you should typically call this method once and use its result as the input to another computation.

Estimator(tf.estimator)

源碼定義:tensorflow/python/estimator/estimator.py
用戶向導:Regression Examples

Estimator對象包裝(wraps)由model_fn指定的模型,model_fn由給定輸入和其他一些參數,返回需要進行訓練、計算,或預測的操作.
所有輸出(檢查點,事件文件等)都被寫入model_dir或其子目錄.如果model_dir未設置,則使用臨時目錄.
可以通過RunConfig對象(包含了有關執行環境的信息)傳遞config參數.它被傳遞給model_fn,如果model_fn有一個名為“config”的參數(和輸入函數以相同的方式).Estimator使得模型可配置化,並且還使用其一些字段來控制內部,特別是關於檢查點的控制.
該params參數包含hyperparameter,如果model_fn有一個名為“PARAMS”的參數,並且以相同的方式傳遞給輸入函數,則將它傳遞給model_fn. Estimator只是沿着參數傳遞,並不檢查它.因此,params的結構完全取決於開發人員.
不能在子類中重寫任何Estimator方法(其構造函數強制執行此操作).子類應使用model_fn來配置基類,並且可以添加實現專門功能的方法.

api 說明
init(model_fn, model_dir=None, config=None, params=None) Estimator的構造方法,返回EstimatorSpec實例
evaluate(input_fn,steps=None,hooks=None,checkpoint_path=None,name=None) 返回一個包含按name為鍵的model_fn中指定的計算指標的詞典。
export_savedmodel(export_dir_base, serving_input_receiver_fn) 將推理圖作為SavedModel導出到給定的目錄中.該方法通過首先調用serving_input_receiver_fn來獲取特征Tensors來構建一個新圖,然后調用這個Estimator的model_fn來基於這些特征生成模型圖.它在新的會話中將給定的檢查點恢復到該圖中.最后它會在給定的export_dir_base下面創建一個時間戳導出目錄,並在其中寫入一個SavedModel,其中包含從此會話保存的單個MetaGraphDef.
get_variable_names() 返回此模型中所有變量名稱的列表.
get_variable_value(name) 返回由名稱給出的變量的值. 返回值類型:Numpy數組
latest_checkpoint() 查找model_dir中最新保存的檢查點文件的文件名.
predict(self,input_fn,predict_keys=None) 對給定的features產生預測,返回predictions字典張量的計算值.
train(input_fn,hooks=None,steps=None,max_steps=None,saving_listeners=None) 訓練給定訓練數據input_fn的模型.

FeatureColumns(tf.feature_column)

源碼定義:tensorflow/python/feature_column/feature_column.py
用戶向導:feature_columns
feature_columns2

Feature cloumns是原始數據和Estimator模型之間的橋梁,它們被用來把各種形式的原始數據轉換為模型能夠使用的格式

api 返回值 用法
tf.feature_column.bucketized_column(source_column,boundaries) Represents discretized dense input. Bucketized column用來把numeric column的值按照提供的邊界(boundaries)離散化為多個值
tf.feature_column.categorical_column_with_hash_bucket(key,hash_bucket_size,dtype=tf.string) Represents sparse feature where ids are set by hashing. 用戶指定類別的總數,通過hash的方式來得到最終的類別ID
tf.feature_column.categorical_column_with_identity(key,num_buckets,default_value=None) A _CategoricalColumn that returns identity values. 與Bucketized column類似,Categorical identity column用單個唯一值表示bucket。
tf.feature_column.categorical_column_with_vocabulary_file(key,vocabulary_file) A _CategoricalColumn with a vocabulary file. 類別特征做one-hot編碼,字典通過文件給出
tf.feature_column.categorical_column_with_vocabulary_list(key,vocabulary_list) A _CategoricalColumn with in-memory vocabulary. 類別特征做one-hot編碼,字典通過list給出
tf.feature_column.crossed_column(keys,hash_bucket_size,hash_key=None) Returns a column for performing crosses of categorical features. 對keys的笛卡爾積執行hash操作,再把hash的結果對hash_bucket_size取模得到最終的結果
tf.feature_column.embedding_column(categorical_column,dimension,combiner='mean') _DenseColumn that converts from sparse, categorical input. 把原始特征映射為一個低維稠密的實數向量
tf.feature_column.indicator_column(categorical_column) Represents multi-hot representation of given categorical column. 把categorical column得到的稀疏tensor轉換為one-hot或者multi-hot形式的稠密tensor
tf.feature_column.input_layer(features,feature_columns) Returns a dense Tensor as input layer based on given feature_columns.
tf.feature_column.linear_model(features,feature_columns) Returns a linear prediction Tensor based on given feature_columns.
tf.feature_column.make_parse_example_spec(feature_columns) Creates parsing spec dictionary from input feature_columns.
tf.feature_column.numeric_column(key,shape=(1,),default_value=None,dtype=tf.float32,normalizer_fn=None) Represents real valued or numerical features.
tf.feature_column.shared_embedding_columns(categorical_columns,dimension,combiner='mean') List of dense columns that convert from sparse, categorical input. 把多個特征共享相同的embeding映射空間
tf.feature_column.weighted_categorical_column(categorical_column,weight_feature_key,dtype=tf.float32) 給一個類別特征賦予一定的權重,給一個類別特征賦予一定的權重

tf.nn

激活函數op,loss部分抽象

源碼定義:tensorflow/python/ops/gen_nn_ops.py
用戶向導:Neural Network

api 說明
tf.nn.relu(...) Computes rectified linear: max(features, 0).
tf.nn.sigmoid(...) Computes sigmoid of x element-wise.
tf.nn.bias_add(...)
tf.nn.tanh(...) Computes hyperbolic tangent of x element-wise.
tf.nn.dropout()
tf.nn.softmax()
tf.nn.sigmoid_cross_entropy_with_logits(...) Computes sigmoid cross entropy given logits.
tf.nn.top_k(...) Finds values and indices of the k largest entries for the last dimension.
tf.nn.embedding_lookup(params,ids,partition_strategy='mod') 按照ids查找params里面的vector然后輸出
tf.nn.embedding_lookup_sparse(params,sp_ids,sp_weights) Looks up ids in a list of embedding tensors,Computes embeddings for the given ids and weights.
tf.nn.zero_fraction(value,name=None) Returns the fraction of zeros in value.
tf.nn.nce_loss() 負采樣的NCE loss實現

tf.layers

網絡模塊抽象

api 說明
average_pooling1d(...) Average Pooling layer for 1D inputs.
average_pooling2d(...) Average pooling layer for 2D inputs (e.g. images).
average_pooling3d(...) Average pooling layer for 3D inputs (e.g. volumes).
batch_normalization(...) Functional interface for the batch normalization layer.
conv1d(...) Functional interface for 1D convolution layer (e.g. temporal convolution).
conv2d(...) Functional interface for the 2D convolution layer.
conv2d_transpose(...) Functional interface for transposed 2D convolution layer.
conv3d(...) Functional interface for the 3D convolution layer.
conv3d_transpose(...) Functional interface for transposed 3D convolution layer.
dense(...) Functional interface for the densely-connected layer.
dropout(...) Applies Dropout to the input.
flatten(...) Flattens an input tensor while preserving the batch axis (axis 0).
max_pooling1d(...) Max Pooling layer for 1D inputs.
max_pooling2d(...) Max pooling layer for 2D inputs (e.g. images).
max_pooling3d(...) Max pooling layer for 3D inputs (e.g. volumes).
separable_conv1d(...) Functional interface for the depthwise separable 1D convolution layer.
separable_conv2d(...) Functional interface for the depthwise separable 2D convolution layer.

tf.train

tf.train provides a set of classes and functions that help train models.

用戶向導:Training

api 說明
tf.train.Optimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.GradientDescentOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdadeltaOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdagradOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.AdamOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SyncReplicasOptimizer The Optimizer base class provides methods to compute gradients for a loss and apply gradients to variables.
tf.train.SessionRunHook Training Hooks,Hook to extend calls to MonitoredSession.run().
tf.train.SessionRunArgs Training Hooks,Represents arguments to be added to a Session.run() call.
tf.train.global_step sess每執行完一個batch,就給global_step加1,用來統計目前執行的batch數
tf.train.basic_train_loop
tf.train.get_global_step() 返回global_step作為name的tensor
tf.train.get_or_create_global_step() Returns and create (if necessary) the global step tensor.
tf.train.write_graph
tf.train.string_input_producer 輸出字符串到一個輸入管道隊列
tf.train.Saver(max_to_keep=5) 保存模型
tf.train.latest_checkpoint(checkpoint_dir,latest_filename=None) Finds the filename of latest saved checkpoint file.
tf.train.Int64List Int64List
tf.train.FloatList FloatList
tf.train.Features Features
tf.train.Example Example
tf.train.batch(tensors,batch_size) Creates batches of tensors in tensors.
tf.trainable_variables()
tf.train.Scaffold Scaffold

tf.linalg

TensorFlow 的線性代數庫。
w3cschool TensorFlow官方文檔-linalg

api 說明
adjoint(...) 轉置最后兩個維度和共軛張量matrix.
band_part(...) 復制張量設置每個最內層矩陣中心帶外的所有內容
cholesky(...) 計算一個或多個方陣的Cholesky分解.
cholesky_solve(...) A X = RHS給出的Cholesky因子分解,求解線性方程組.
det(...) 計算一個或多個方陣的行列式.
diag(...) 返回具有給定批處理對角線值的批處理對角線張量.
diag_part(...) 返回批處理張量的批處理對角線部分.
eigh(...) 計算了一批自共軛矩陣的特征分解.
eigvalsh(...) 計算一個或多個自共軛矩陣的特征值.
einsum(...) 任意維度的張量之間的廣義收縮.
expm(...) 計算一個或多個方陣的矩陣指數.
eye(...) 構造一個單位矩陣或批矩陣.
inv(...) 計算一個或多個平方可逆矩陣或它們的倒數
logdet(...) 計算hermitian正定矩陣的行列式的對數.
logm(...) 計算一個或多個方陣的矩陣對數:
lstsq(...) 解決一個或多個線性最小二乘問題.
norm(...) 計算向量,矩陣和張量的范數.(不贊成的參數)
qr(...) 計算一個或多個矩陣的QR分解.
set_diag(...) 返回具有新批處理對角線值的批處理矩陣張量.
slogdet(...) 計算行列式的絕對值的符號和日志
solve(...) 求解線性方程組.
svd(...) 計算一個或多個矩陣的奇異值分解.
tensordot(...) a和b沿指定軸的張量收縮.
trace(...) 計算張量x的軌跡.
transpose(...) 轉置張量a的最后兩個維度.
triangular_solve(...) 求解具有上三角矩陣或下三角矩陣的線性方程組.

checkpoint(模型保存與恢復)

Estimator 將一個檢查點保存到 model_dir 中后,每次調用 Estimator 的 train、eval 或 predict 方法時,都會發生下列情況:
a) Estimator 通過運行 model_fn() 構建模型圖。(要詳細了解 model_fn(),請參閱創建自定義 Estimator。)
b) Estimator 根據最近寫入的檢查點中存儲的數據來初始化新模型的權重。
換言之,一旦存在檢查點,TensorFlow 就會在您每次調用 train()、evaluate() 或 predict() 時重建模型。

Tensorflow Serving

官方例子

half_plus_two的例子

# Download the TensorFlow Serving Docker image and repo
docker pull tensorflow/serving

git clone https://github.com/tensorflow/serving
# Location of demo models
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"

# Start TensorFlow Serving container and open the REST API port
# docker run -t 表示是否允許偽TTY
# docker run --rm表示如果實例已經存在,則先remove掉該實例再重新啟動新實例
# docker -p設置端口映射
# docker -v設置磁盤映射
# docker -e設置環境變量
docker run -t --rm -p 8501:8501 \
    -v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" \
    -e MODEL_NAME=half_plus_two \
    tensorflow/serving &

# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}' \
    -X POST http://localhost:8501/v1/models/half_plus_two:predict

# Returns => { "predictions": [2.5, 3.0, 4.5] }

serving鏡像提供了兩種調用方式:gRPC和HTTP請求。gRPC默認端口是8500,HTTP請求的默認端口是8501,serving鏡像中的程序會自動加載鏡像內/models下的模型,通過MODEL_NAME指定/models下的哪個模型。

創建自定義鏡像

docker run --rm -p 8501:8501 \
    --mount type=bind,source=${model_dir},target=/models/dpp_model \
    -e MODEL_NAME=dpp_model -t tensorflow/serving &

官方文檔

架構

TensorFlow Serving可抽象為一些組件構成,每個組件實現了不同的API任務,其中最重要的是Servable, Loader, Source, 和 Manager,我們看一下組件之間是如何交互的。

Source

TF Serving發現磁盤上的模型文件,該模型服務的生命周期就開始了。Source組件負責發現模型文件,找出需要加載的新模型。實際上,該組件監控文件系統,當發現一個新的模型版本,就為該模型創建一個Loader。

Loader

Loader需要知道模型的相關信息,包括如何加載模型如何估算模型需要的資源,包括需要請求的RAM、GPU內存。Loader帶一個指針,連接到磁盤上存儲的模型,其中包含加載模型需要的相關元數據。不過記住,Loader現在還不允許加載模型。

Manager

Manager收到待加載模型版本,開始模型服務流程。此處有兩種可能性,第一種情況是模型首次推送部署,Manager先確保模型需要的資源可用,一旦獲取相應的資源,Manager賦予Loader權限去加載模型。
第二種情況是為已上線模型部署一個新版本。Manager會先查詢Version Policy插件,決定加載新模型的流程如何進行。
具體來說,當加載新模型時,可選擇保持 (1) 可用性(the Availability Preserving Policy)或 (2) 資源(the Resource Preserving Policy)。

Servable

最后,當用戶請求模型的句柄,Manager返回句柄給Servable。

Servables 是 TensorFlow Serving 中最核心的抽象,是客戶端用於執行計算 (例如:查找或推斷) 的底層對象。

部署服務

模型導出

將TensorFlow構建的模型用作服務,首先需要確保導出為正確的格式,可以采用TensorFlow提供的SavedModel類。TensorFlow Saver提供模型checkpoint磁盤文件的保存/恢復。事實上SavedModel封裝了TensorFlow Saver,對於模型服務是一種標准的導出方法。
SignatureDefs定義了一組TensorFlow支持的計算簽名,便於在計算圖中找到適合的輸入輸出張量。簡單的說,使用這些計算簽名,可以准確指定特定的輸入輸出節點。目前有3個服務API: 分類、預測和回歸。每個簽名定義關聯一個RPC API。分類SignatureDef用於分類RPC API,預測SignatureDef用於RPC API等等。對於分類SignatureDef,需要一個輸入張量(接收數據)以及可能的輸出張量: 類別或得分。回歸SignatureDef需要一個輸入張量以及另一個輸出張量。最后預測SignatureDef需要一個可變長度的輸入輸出張量。

提供tensorflow模型

tf.saved_model API

with sess.graph.as_default() as graph:
    builder = tf.saved_model.builder.SavedModelBuilder(saved_model_dir)
	# 1.8版本,1.12之后調整為tf.saved_model.predict_signature_def
    signature = tf.saved_model.signature_def_utils.predict_signature_def(inputs={'image': in_image},
                                      outputs={'prediction': graph.get_tensor_by_name('final_result:0')},)
    builder.add_meta_graph_and_variables(sess=sess,
                                         tags=["serve"],
                                         signature_def_map={'predict':signature,
		       			   tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:signature
	})
    builder.save()

tf.estimator API

def export_savedmodel(self,
                      export_dir,
                      serving_input_receiver_fn=None,
                      as_text=False,
                      checkpoint_path=None):
    if serving_input_receiver_fn is None and self.feature_cfgs is None:
        raise ValueError('Both serving_input_receiver_fn and feature_cfgs are none.')

    if serving_input_receiver_fn is None:
        feature_spec = self.feature_cfgs.to_feature_spec(exclude_targets=True)
        serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
	# 自己封裝的estimator
    savedmodel_path = self.estimator.export_savedmodel(
        export_dir_base=export_dir,
        serving_input_receiver_fn=serving_input_receiver_fn,
        as_text=as_text,
        checkpoint_path=checkpoint_path
    )
    return savedmodel_path

其中self.feature_cfgs.to_feature_spec函數得到如下結果:

{'follow_seq': VarLenFeature(dtype=tf.int64), 'uid': VarLenFeature(dtype=tf.int64)}

說明了輸入】

TensorFlow Serving ModelServer 用於發現新導出的模型,並啟動 gRPC 用於提供模型服務。

API請求(predict)

TensorFlow ModelServer通過host:port接受下面這種RESTful API請求:
POST http://host:port/ :
URI: /v1/models/ \({MODEL_NAME}[/versions/\){MODEL_VERSION}]
VERB: classify|regress|predict

其中“/versions/${MODEL_VERSION}”是可選的,如果省略,則使用最新的版本。

該API基本遵循gRPC版本的PredictionService API。

請求URL的示例:

http://host:port/v1/models/iris:classify
http://host:port/v1/models/mnist/versions/314:predict

請求格式

預測API的請求體必須是如下格式的JSON對象:

{
  // (Optional) Serving signature to use.
  // If unspecifed default serving signature is used.
  "signature_name": <string>,

  // Input Tensors in row ("instances") or columnar ("inputs") format.
  // A request can have either of them but NOT both.
  "instances": <value>|<(nested)list>|<list-of-objects>
  "inputs": <value>|<(nested)list>|<object>
}

以行的形式說明輸入的tensor

{
 "instances": [
   {
     "tag": "foo",
     "signal": [1, 2, 3, 4, 5],
     "sensor": [[1, 2], [3, 4]]
   },
   {
     "tag": "bar",
     "signal": [3, 4, 1, 2, 5]],
     "sensor": [[4, 5], [6, 8]]
   }
 ]
}

以列的形式說明輸入的tensor

{
 "inputs": {
   "tag": ["foo", "bar"],
   "signal": [[1, 2, 3, 4, 5], [3, 4, 1, 2, 5]],
   "sensor": [[[1, 2], [3, 4]], [[4, 5], [6, 8]]]
 }
}

返回格式

行形式

{
  "predictions": <value>|<(nested)list>|<list-of-objects>
}

如果模型的輸出只包含一個命名的tensor,我們省略名字和predictions key map,直接使用標量或者值的list。如果模型輸出多個命名的tensor,我們輸出對象list,和上面提到的行形式輸入類似。

列形式

{
  "outputs": <value>|<(nested)list>|<object>
}

如果模型的輸出只包含一個命名的tensor,我們省略名字和outputs key map,直接使用標量或者值的list。如果模型輸出多個命名的tensor,我們輸出對象,其每個key都和輸出的tensor名對應,和上面提到的列形式輸入類似。

API demo

$ curl -d '{"instances": [1.0,2.0,5.0]}' -X POST http://localhost:8501/v1/models/half_plus_three:predict
{
    "predictions": [3.5, 4.0, 5.5]
}

[譯]TensorFlow Serving RESTful API

源碼理解

tensorflow-serving源碼理解

開發中遇到的其他問題記錄

實現numpy中的切片賦值

# 初始化一個3*4的tensor和大小為3的list
cis = tf.zeros([3, 4)
cis_list = [tf.zeros([4]) for _ in range(3)]
# 替換list的一個元素
cis_list[k] = eis
# 重新堆疊成一個tensor,列切片賦值axis=1?
cis = tf.stack(cis_list)

附錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM