转载:http://thinkgamer.cn/2019/05/07/%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0/Spark%E6%8E%92%E5%BA%8F%E7%AE%97%E6%B3%95%E7%B3%BB%E5%88%97%E4%B9%8B%EF%BC%88MLLib%E3%80%81ML%EF%BC%89LR%E4%BD%BF%E7%94%A8%E6%96%B9%E5%BC%8F%E4%BB%8B%E7%BB%8D/
【Spark排序算法系列】主要介绍的是目前推荐系统或者广告点击方面用的比较广的几种算法,和他们在Spark中的应用实现,本篇文章主要介绍LR算法。
本系列还包括(持续更新):
- Spark排序算法系列之GBDT(梯度提升决策树)
- Spark排序算法系列之模型融合(GBDT+LR)
- Spark排序算法系列之XGBoost
- Spark排序算法系列之FTRL(Follow-the-regularized-Leader)
- Spark排序算法系列之FM与FFM
背景
逻辑回归(Logistic Regression,LR)是较早应用在推荐排序上的,其属于线性模型,模型简单,可以引入海量离散特征,这样的好处就是模型可以考虑更加细节或者说针对具体个体的因素。如果想要引入非线性因素需要做特征交叉,这样很容易产生百亿特征,在很早之前ctr就主要靠堆人力搞特征工程工作来持续优化效果。
虽然目前在工业界LR应用的并不多,但是对于初学者,一些中小企业或者应用场景不需要负责排序模型的时候,LR扔不失为一个不错的选择。
关于LR的算法原理,这里不做过多说明,可参考:
LR介绍
LR的数学表达式可以简写为:
L(w,x,y)=log(1+exp(−ywTx))L(w,x,y)=log(1+exp(−ywTx))
对于二分类模型,LR是一个分类算法,模型计算得到预测值后会通过以下函数进转化。
f(z)=11+e−zxf(z)=11+e−zx
如果L(w,x,y) > 0.5 则是1 否则为0。当然在实际应用过程中,并不是一定取0.5作为界限值,而是根据实际情况进行调整。
二进制回归可以转化为多分类回归问题。关于多分类介绍和基于Spark实现多分类可参考多分类实现方式介绍和在Spark上实现多分类逻辑回归(Multinomial Logistic Regression)
在Spark.mllib包中提供了两种LR分类模型,分别是:
- mini-batch gradient descent(LogisticRegressionWithLBFGS)
- L-BFGS(LogisticRegressionWithSGD)
但官方给出的建议是:推荐使用LBFGS,因为基于LBFGS的LR比基于SGD的能更快的收敛。其原话如下:
We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.
而且LRWithLBFGS不仅支持二分类还支持多分类,但LRWithSGD只支持二分类。所以后续只介绍下Spark mllib中的LogisticRegressionWithLBFGS相关操作。
mllib中的LRWithLBFGS
设置变量和创建spark对象
1 2 3 4 5 6 7 8 9
|
val file = "data/sample_libsvm_data.txt" val model_path = "model/lr/" val model_param = "numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient"
val spark = SparkSession.builder() .master("local[5]") .appName("LogisticRegression_Model_Train") .getOrCreate() Logger.getRootLogger.setLevel(Level.WARN)
|
拆分数据集
1 2 3
|
// 记载数据集 并拆分成训练集和测试集 val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3)) val (train, test) = (data(0), data(1))
|
LRWithLBFGS模型设置参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
// 定义分类的数目,默认为2,是logisticregression的参数 private var numClass: Int = 2 // 定义是否添加截距,默认值为false,是logisticregression的参数 private var isAddIntercept: Option[Boolean] = None // 定义是否在训练模型前进行验证,是logisticregression的参数 private var isValidateData: Option[Boolean] = None
// 定义迭代的次数,默认值是100,LBFGS的参数 private var numInterations: Option[Int] = None // 定义正则化系数值,默认值是0.0,LBFGS的参数 private var regParam: Option[Double] = None // 定义正则化参数,支持:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[没有正则项],LBFGS的参数 private var updater: Option[String] = None // 定义计算梯度的方式,支持:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的参数 private var gradient: Option[String] = None // 人工定义的收敛阈值 private var threshold:Option[Double]=None // 定义模型收敛阈值,默认为 10^-6 private var convergenceTol: Double= 1.0e-6
|
创建模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
def createLRModel(model_param: String): LogisticRegressionWithLBFGS={ // 设置模型参数 val optimizer = new LROptimizer() optimizer.parseString(model_param) println(s"模型训练参数为:${optimizer.toString}")
// 创建模型并指定相关参数 val LRModel = new LogisticRegressionWithLBFGS() // 设置分类数目 LRModel.setNumClasses(optimizer.getNumClass) // 设置是否添加截距 if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)} // 设置是否进行验证模型 if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)} // 设置迭代次数 if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))} // 设置正则项参数 if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) } // 设置正则化参数 if(optimizer.getUpdater.nonEmpty){ optimizer.getUpdater match { case Some("L1Updater") => LRModel.optimizer.setUpdater( new L1Updater()) case Some("SquaredL2Updater") => LRModel.optimizer.setUpdater(new SquaredL2Updater()) case Some("SimpleUpdater") => LRModel.optimizer.setUpdater(new SimpleUpdater()) case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater()) } } // 设置梯度计算方式 if(optimizer.getGradient.nonEmpty){ optimizer.getGradient match { case Some("LogisticGradient") => LRModel.optimizer.setGradient(new LogisticGradient()) case Some("LeastSquaresGradient") => LRModel.optimizer.setGradient(new LeastSquaresGradient()) case Some("HingeGradient") => LRModel.optimizer.setGradient(new HingeGradient()) case _ => LRModel.optimizer.setGradient(new LogisticGradient()) } } // 设置收敛阈值 if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)} else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)}
LRModel }
|
模型效果评估
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = { // MSE val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean() println(s"Test Mean Squared Error = $testMSE") // AUC val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2) println(s"0-1 label AUC is = ${metrics.areaUnderROC}") val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2) println(s"score-label AUC is = ${metrics1.areaUnderROC}") // 错误率 val error = result.filter(x => x._1!=x._2).count().toDouble / result.count() println(s"error is = $error") // 准确率 val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count() println(s"accuracy is = $accuracy") }
|
保存模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
def saveModel(model: LogisticRegressionModel, model_path: String): Unit = { // 保存模型文件 obj val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+"model.obj")) out_obj.writeObject(model)
// 保存模型信息 val model_info=new BufferedWriter(new FileWriter(model_path+"model_info.txt")) model_info.write(model.toString()) model_info.flush() model_info.close()
// 保存模型权重 val model_weights=new BufferedWriter(new FileWriter(model_path+"model_weights.txt")) model_weights.write(model.weights.toString) model_weights.flush() model_weights.close()
println(s"模型信息写入文件完成,路径为:$model_path") }
|
加载模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
def loadModel(model_path: String): Option[LogisticRegressionModel] = { try{ val in = new ObjectInputStream( new FileInputStream(model_path) ) val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] ) in.close() println("Model Load Success") model } catch { case ex: ClassNotFoundException => { println(ex.printStackTrace()) None } case ex: IOException => { println(ex.printStackTrace()) println(ex) None } case _: Throwable => throw new Exception } }
|
使用加载的模型进行分值计算
1 2 3 4 5 6 7 8 9 10 11
|
// 加载obj文件进行预测 val model_new = loadModel(s"$model_path/model.obj") // 使用加载的模型进行样例预测 val result_new = test.map(line =>{ val pre_label = model_new.get.predict(line.features) // blas.ddot(x.length, x,1,y,1) (向量x的长度,向量x,向量x的索引递增间隔,向量y,向量y的索引递增间隔) val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1) val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1) (line.label, pre_label,score) } ) result_new.take(2).foreach(println)
|
ml中的二分类LR
ml包中的LR既可以用来做二分类,也可以用来做多分类。
- 二分类对应:Binomial logistic regression
- 多分类对应:multinomial logistic regression
其中二分类可以通过Binomial logistic regression 和 multinomial logistic regression实现。
基于Binomial logistic regression的LR实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 创建模型 val LRModel = new LogisticRegression() .setMaxIter(20) .setRegParam(0.3) .setElasticNetParam(0.8) // 训练评估模型 val model = LRModel.fit(train) evalute(model, train, spark) }
def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = { // 打印模型参数 println(s"模型参数信息如下:\n ${model.parent.explainParams()} \n") println(s"Coefficients(系数): ${model.coefficients}") println(s"Intercept(截距): ${model.intercept}") // 查看训练集的预测结果 rawPrediction:row 计算的分值,probability:经过sigmoid转换后的概率 val result = model.evaluate(train) result.predictions.show(10) // 将 label,0 值概率,predict label提取出来 result.predictions.select("label","probability","prediction").rdd .map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2))) .take(10).foreach(println) // 模型评估 val trainSummary = model.summary val objectiveHistory = trainSummary.objectiveHistory println("objectiveHistoryLoss:") objectiveHistory.foreach(loss => println(loss))
val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary]
val roc = binarySummary.roc roc.show() println(s"areaUnderROC: ${binarySummary.areaUnderROC}")
// Set the model threshold to maximize F-Measure val fMeasure = binarySummary.fMeasureByThreshold fMeasure.show(10) val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0) import spark.implicits ._ val bestThreshold = fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0) model.setThreshold(bestThreshold) }
|
基于Multimial logistic regression的LR实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 创建模型 val LRModel = new LogisticRegression() .setMaxIter(10) .setRegParam(0.3) .setElasticNetParam(0.8) .setFamily("multinomial") // 训练模型 val model = LRModel.fit(train) // 打印模型参数 println(s"模型参数信息如下:\n ${model.parent.explainParams()} \n") println(s"Coefficients(系数): ${model.coefficientMatrix}") println(s"Intercept(截距): ${model.interceptVector}") }
|
ml中的多分类LR
某条样本属于类别k的概率计算为:
P(Y=k|X,βk,β0k)=eβk⋅X+β0k∑K−1kj=0eβkj⋅X+β0