本篇文章介紹在spark中調用訓練好的tensorflow模型進行預測的方法。
本文內容的學習需要一定的spark和scala基礎。
如果使用pyspark的話會比較簡單,只需要在每個excutor上用Python加載模型分別預測就可以了。
但工程上為了性能考慮,通常使用的是scala版本的spark。
本篇文章我們通過TensorFlow for Java 在spark中調用訓練好的tensorflow模型。
利用spark的分布式計算能力,從而可以讓訓練好的tensorflow模型在成百上千的機器上分布式並行執行模型推斷。
〇,spark-scala調用tensorflow模型概述
在spark(scala)中調用tensorflow模型進行預測需要完成以下幾個步驟。
(1)准備protobuf模型文件
(2)創建spark(scala)項目,在項目中添加java版本的tensorflow對應的jar包依賴
(3)在spark(scala)項目中driver端加載tensorflow模型調試成功
(4)在spark(scala)項目中通過RDD在excutor上加載tensorflow模型調試成功
(5) 在spark(scala)項目中通過DataFrame在excutor上加載tensorflow模型調試成功
一,准備protobuf模型文件
我們使用tf.keras 訓練一個簡單的線性回歸模型,並保存成protobuf文件。
import tensorflow as tf from tensorflow.keras import models,layers,optimizers ## 樣本數量 n = 800 ## 生成測試用數據集 X = tf.random.uniform([n,2],minval=-10,maxval=10) w0 = tf.constant([[2.0],[-1.0]]) b0 = tf.constant(3.0) Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0) # @表示矩陣乘法,增加正態擾動 ## 建立模型 tf.keras.backend.clear_session() inputs = layers.Input(shape = (2,),name ="inputs") #設置輸入名字為inputs outputs = layers.Dense(1, name = "outputs")(inputs) #設置輸出名字為outputs linear = models.Model(inputs = inputs,outputs = outputs) linear.summary() ## 使用fit方法進行訓練 linear.compile(optimizer="rmsprop",loss="mse",metrics=["mae"]) linear.fit(X,Y,batch_size = 8,epochs = 100) tf.print("w = ",linear.layers[1].kernel) tf.print("b = ",linear.layers[1].bias) ## 將模型保存成pb格式文件 export_path = "./data/linear_model/" version = "1" #后續可以通過版本號進行模型版本迭代與管理 linear.save(export_path+version, save_format="tf")
!ls {export_path+version} # 查看模型文件相關信息 !saved_model_cli show --dir {export_path+str(version)} --all
模型文件信息中這些標紅的部分都是后面有可能會用到的。
二,創建spark(scala)項目,在項目中添加java版本的tensorflow對應的jar包依賴
如果使用maven管理項目,需要添加如下 jar包依賴
<!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow --> <dependency> <groupId>org.tensorflow</groupId> <artifactId>tensorflow</artifactId> <version>1.15.0</version> </dependency>
也可以從下面網址中直接下載 org.tensorflow.tensorflow的jar包
以及其依賴的org.tensorflow.libtensorflow 和 org.tensorflowlibtensorflow_jni的jar包 放到項目中。
https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0
三, 在spark(scala)項目中driver端加載tensorflow模型調試成功
我們的示范代碼在jupyter notebook中進行演示,需要安裝toree以支持spark(scala)。
import scala.collection.mutable.WrappedArray import org.{tensorflow=>tf} //注:load函數的第二個參數一般都是“serve”,可以從模型文件相關信息中找到 val bundle = tf.SavedModelBundle .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days/data/linear_model/1","serve") //注:在java版本的tensorflow中還是類似tensorflow1.0中靜態計算圖的模式,需要建立Session, 指定feed的數據和fetch的結果, 然后 run. //注:如果有多個數據需要喂入,可以連續用用多個feed方法 //注:輸入必須是float類型 val sess = bundle.session() val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f))) val y = sess.runner().feed("serving_default_inputs:0", x) .fetch("StatefulPartitionedCall:0").run().get(0) val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt) y.copyTo(result) if(x != null) x.close() if(y != null) y.close() if(sess != null) sess.close() if(bundle != null) bundle.close() result
輸出如下:
Array(Array(3.019596), Array(3.9878292))
四,在spark(scala)項目中通過RDD在excutor上加載tensorflow模型調試成功
下面我們通過廣播機制將Driver端加載的TensorFlow模型傳遞到各個excutor上,並在excutor上分布式地調用模型進行推斷。
import org.apache.spark.sql.SparkSession import scala.collection.mutable.WrappedArray import org.{tensorflow=>tf} val spark = SparkSession .builder() .appName("TfRDD") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext //在Driver端加載模型 val bundle = tf.SavedModelBundle .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve") //利用廣播將模型發送到excutor上 val broads = sc.broadcast(bundle) //構造數據集 val rdd_data = sc.makeRDD(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,3.0f))) //通過mapPartitions調用模型進行批量推斷 val rdd_result = rdd_data.mapPartitions(iter => { val arr = iter.toArray val model = broads.value val sess = model.session() val x = tf.Tensor.create(arr) val y = sess.runner().feed("serving_default_inputs:0", x) .fetch("StatefulPartitionedCall:0").run().get(0) //將預測結果拷貝到相同shape的Float類型的Array中 val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt) y.copyTo(result) result.iterator }) rdd_result.take(5) bundle.close
輸出如下:
Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984)
五, 在spark(scala)項目中通過DataFrame在excutor上加載tensorflow模型調試成功
除了可以在Spark的RDD數據上調用tensorflow模型進行分布式推斷,
我們也可以在DataFrame數據上調用tensorflow模型進行分布式推斷。
主要思路是將推斷方法注冊成為一個sparkSQL函數。
import org.apache.spark.sql.SparkSession import scala.collection.mutable.WrappedArray import org.{tensorflow=>tf} object TfDataFrame extends Serializable{ def main(args:Array[String]):Unit = { val spark = SparkSession .builder() .appName("TfDataFrame") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ val bundle = tf.SavedModelBundle .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve") val broads = sc.broadcast(bundle) //構造預測函數,並將其注冊成sparkSQL的udf val tfpredict = (features:WrappedArray[Float]) => { val bund = broads.value val sess = bund.session() val x = tf.Tensor.create(Array(features.toArray)) val y = sess.runner().feed("serving_default_inputs:0", x) .fetch("StatefulPartitionedCall:0").run().get(0) val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt) y.copyTo(result) val y_pred = result(0)(0) y_pred } spark.udf.register("tfpredict",tfpredict) //構造DataFrame數據集,將features放到一列中 val dfdata = sc.parallelize(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(7.0f,8.0f))).toDF("features") dfdata.show //調用sparkSQL預測函數,增加一個新的列作為y_preds val dfresult = dfdata.selectExpr("features","tfpredict(features) as y_preds") dfresult.show bundle.close } }
TfDataFrame.main(Array())
+----------+
| features|
+----------+
|[1.0, 2.0]|
|[3.0, 5.0]|
|[7.0, 8.0]|
+----------+
+----------+---------+
| features| y_preds|
+----------+---------+
|[1.0, 2.0]| 3.019596|
|[3.0, 5.0]|3.9264367|
|[7.0, 8.0]| 8.828995|
+----------+---------+
以上我們分別在spark 的RDD數據結構和DataFrame數據結構上實現了調用一個tf.keras實現的線性回歸模型進行分布式模型推斷。
在本例基礎上稍作修改則可以用spark調用訓練好的各種復雜的神經網絡模型進行分布式模型推斷。
但實際上tensorflow並不僅僅適合實現神經網絡,其底層的計算圖語言可以表達各種數值計算過程。
利用其豐富的低階API,我們可以在tensorflow2.0上實現任意機器學習模型,
結合tf.Module提供的便捷的封裝功能,我們可以將訓練好的任意機器學習模型導出成模型文件並在spark上分布式調用執行。
這無疑為我們的工程應用提供了巨大的想象空間。
參考:
開源電子書地址:https://lyhue1991.github.io/eat_tensorflow2_in_30_days/
GitHub 項目地址:https://github.com/lyhue1991/eat_tensorflow2_in_30_days