轉載:http://thinkgamer.cn/2019/05/07/%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0/Spark%E6%8E%92%E5%BA%8F%E7%AE%97%E6%B3%95%E7%B3%BB%E5%88%97%E4%B9%8B%EF%BC%88MLLib%E3%80%81ML%EF%BC%89LR%E4%BD%BF%E7%94%A8%E6%96%B9%E5%BC%8F%E4%BB%8B%E7%BB%8D/
【Spark排序算法系列】主要介紹的是目前推薦系統或者廣告點擊方面用的比較廣的幾種算法,和他們在Spark中的應用實現,本篇文章主要介紹LR算法。
本系列還包括(持續更新):
- Spark排序算法系列之GBDT(梯度提升決策樹)
- Spark排序算法系列之模型融合(GBDT+LR)
- Spark排序算法系列之XGBoost
- Spark排序算法系列之FTRL(Follow-the-regularized-Leader)
- Spark排序算法系列之FM與FFM
背景
邏輯回歸(Logistic Regression,LR)是較早應用在推薦排序上的,其屬於線性模型,模型簡單,可以引入海量離散特征,這樣的好處就是模型可以考慮更加細節或者說針對具體個體的因素。如果想要引入非線性因素需要做特征交叉,這樣很容易產生百億特征,在很早之前ctr就主要靠堆人力搞特征工程工作來持續優化效果。
雖然目前在工業界LR應用的並不多,但是對於初學者,一些中小企業或者應用場景不需要負責排序模型的時候,LR扔不失為一個不錯的選擇。
關於LR的算法原理,這里不做過多說明,可參考:
LR介紹
LR的數學表達式可以簡寫為:
L(w,x,y)=log(1+exp(−ywTx))L(w,x,y)=log(1+exp(−ywTx))
對於二分類模型,LR是一個分類算法,模型計算得到預測值后會通過以下函數進轉化。
f(z)=11+e−zxf(z)=11+e−zx
如果L(w,x,y) > 0.5 則是1 否則為0。當然在實際應用過程中,並不是一定取0.5作為界限值,而是根據實際情況進行調整。
二進制回歸可以轉化為多分類回歸問題。關於多分類介紹和基於Spark實現多分類可參考多分類實現方式介紹和在Spark上實現多分類邏輯回歸(Multinomial Logistic Regression)
在Spark.mllib包中提供了兩種LR分類模型,分別是:
- mini-batch gradient descent(LogisticRegressionWithLBFGS)
- L-BFGS(LogisticRegressionWithSGD)
但官方給出的建議是:推薦使用LBFGS,因為基於LBFGS的LR比基於SGD的能更快的收斂。其原話如下:
We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.
而且LRWithLBFGS不僅支持二分類還支持多分類,但LRWithSGD只支持二分類。所以后續只介紹下Spark mllib中的LogisticRegressionWithLBFGS相關操作。
mllib中的LRWithLBFGS
設置變量和創建spark對象
1 2 3 4 5 6 7 8 9
|
val file = "data/sample_libsvm_data.txt" val model_path = "model/lr/" val model_param = "numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient"
val spark = SparkSession.builder() .master("local[5]") .appName("LogisticRegression_Model_Train") .getOrCreate() Logger.getRootLogger.setLevel(Level.WARN)
|
拆分數據集
1 2 3
|
// 記載數據集 並拆分成訓練集和測試集 val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3)) val (train, test) = (data(0), data(1))
|
LRWithLBFGS模型設置參數
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
// 定義分類的數目,默認為2,是logisticregression的參數 private var numClass: Int = 2 // 定義是否添加截距,默認值為false,是logisticregression的參數 private var isAddIntercept: Option[Boolean] = None // 定義是否在訓練模型前進行驗證,是logisticregression的參數 private var isValidateData: Option[Boolean] = None
// 定義迭代的次數,默認值是100,LBFGS的參數 private var numInterations: Option[Int] = None // 定義正則化系數值,默認值是0.0,LBFGS的參數 private var regParam: Option[Double] = None // 定義正則化參數,支持:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[沒有正則項],LBFGS的參數 private var updater: Option[String] = None // 定義計算梯度的方式,支持:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的參數 private var gradient: Option[String] = None // 人工定義的收斂閾值 private var threshold:Option[Double]=None // 定義模型收斂閾值,默認為 10^-6 private var convergenceTol: Double= 1.0e-6
|
創建模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
def createLRModel(model_param: String): LogisticRegressionWithLBFGS={ // 設置模型參數 val optimizer = new LROptimizer() optimizer.parseString(model_param) println(s"模型訓練參數為:${optimizer.toString}")
// 創建模型並指定相關參數 val LRModel = new LogisticRegressionWithLBFGS() // 設置分類數目 LRModel.setNumClasses(optimizer.getNumClass) // 設置是否添加截距 if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)} // 設置是否進行驗證模型 if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)} // 設置迭代次數 if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))} // 設置正則項參數 if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) } // 設置正則化參數 if(optimizer.getUpdater.nonEmpty){ optimizer.getUpdater match { case Some("L1Updater") => LRModel.optimizer.setUpdater( new L1Updater()) case Some("SquaredL2Updater") => LRModel.optimizer.setUpdater(new SquaredL2Updater()) case Some("SimpleUpdater") => LRModel.optimizer.setUpdater(new SimpleUpdater()) case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater()) } } // 設置梯度計算方式 if(optimizer.getGradient.nonEmpty){ optimizer.getGradient match { case Some("LogisticGradient") => LRModel.optimizer.setGradient(new LogisticGradient()) case Some("LeastSquaresGradient") => LRModel.optimizer.setGradient(new LeastSquaresGradient()) case Some("HingeGradient") => LRModel.optimizer.setGradient(new HingeGradient()) case _ => LRModel.optimizer.setGradient(new LogisticGradient()) } } // 設置收斂閾值 if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)} else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)}
LRModel }
|
模型效果評估
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = { // MSE val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean() println(s"Test Mean Squared Error = $testMSE") // AUC val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2) println(s"0-1 label AUC is = ${metrics.areaUnderROC}") val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2) println(s"score-label AUC is = ${metrics1.areaUnderROC}") // 錯誤率 val error = result.filter(x => x._1!=x._2).count().toDouble / result.count() println(s"error is = $error") // 准確率 val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count() println(s"accuracy is = $accuracy") }
|
保存模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
def saveModel(model: LogisticRegressionModel, model_path: String): Unit = { // 保存模型文件 obj val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+"model.obj")) out_obj.writeObject(model)
// 保存模型信息 val model_info=new BufferedWriter(new FileWriter(model_path+"model_info.txt")) model_info.write(model.toString()) model_info.flush() model_info.close()
// 保存模型權重 val model_weights=new BufferedWriter(new FileWriter(model_path+"model_weights.txt")) model_weights.write(model.weights.toString) model_weights.flush() model_weights.close()
println(s"模型信息寫入文件完成,路徑為:$model_path") }
|
加載模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
def loadModel(model_path: String): Option[LogisticRegressionModel] = { try{ val in = new ObjectInputStream( new FileInputStream(model_path) ) val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] ) in.close() println("Model Load Success") model } catch { case ex: ClassNotFoundException => { println(ex.printStackTrace()) None } case ex: IOException => { println(ex.printStackTrace()) println(ex) None } case _: Throwable => throw new Exception } }
|
使用加載的模型進行分值計算
1 2 3 4 5 6 7 8 9 10 11
|
// 加載obj文件進行預測 val model_new = loadModel(s"$model_path/model.obj") // 使用加載的模型進行樣例預測 val result_new = test.map(line =>{ val pre_label = model_new.get.predict(line.features) // blas.ddot(x.length, x,1,y,1) (向量x的長度,向量x,向量x的索引遞增間隔,向量y,向量y的索引遞增間隔) val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1) val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1) (line.label, pre_label,score) } ) result_new.take(2).foreach(println)
|
ml中的二分類LR
ml包中的LR既可以用來做二分類,也可以用來做多分類。
- 二分類對應:Binomial logistic regression
- 多分類對應:multinomial logistic regression
其中二分類可以通過Binomial logistic regression 和 multinomial logistic regression實現。
基於Binomial logistic regression的LR實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 創建模型 val LRModel = new LogisticRegression() .setMaxIter(20) .setRegParam(0.3) .setElasticNetParam(0.8) // 訓練評估模型 val model = LRModel.fit(train) evalute(model, train, spark) }
def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = { // 打印模型參數 println(s"模型參數信息如下:\n ${model.parent.explainParams()} \n") println(s"Coefficients(系數): ${model.coefficients}") println(s"Intercept(截距): ${model.intercept}") // 查看訓練集的預測結果 rawPrediction:row 計算的分值,probability:經過sigmoid轉換后的概率 val result = model.evaluate(train) result.predictions.show(10) // 將 label,0 值概率,predict label提取出來 result.predictions.select("label","probability","prediction").rdd .map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2))) .take(10).foreach(println) // 模型評估 val trainSummary = model.summary val objectiveHistory = trainSummary.objectiveHistory println("objectiveHistoryLoss:") objectiveHistory.foreach(loss => println(loss))
val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary]
val roc = binarySummary.roc roc.show() println(s"areaUnderROC: ${binarySummary.areaUnderROC}")
// Set the model threshold to maximize F-Measure val fMeasure = binarySummary.fMeasureByThreshold fMeasure.show(10) val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0) import spark.implicits ._ val bestThreshold = fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0) model.setThreshold(bestThreshold) }
|
基於Multimial logistic regression的LR實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 創建模型 val LRModel = new LogisticRegression() .setMaxIter(10) .setRegParam(0.3) .setElasticNetParam(0.8) .setFamily("multinomial") // 訓練模型 val model = LRModel.fit(train) // 打印模型參數 println(s"模型參數信息如下:\n ${model.parent.explainParams()} \n") println(s"Coefficients(系數): ${model.coefficientMatrix}") println(s"Intercept(截距): ${model.interceptVector}") }
|
ml中的多分類LR
某條樣本屬於類別k的概率計算為:
P(Y=k|X,βk,β0k)=eβk⋅X+β0k∑K−1kj=0eβkj⋅X+β0