【推薦系統篇】--推薦系統之訓練模型


一、前述

經過之前的訓練數據的構建可以得到所有特征值為1的模型文件,本文將繼續構建訓練數據特征並構建模型。

二、詳細流程

將處理完成后的訓練數據導出用做線下訓練的源數據(可以用Spark_Sql對數據進行處理)
insert overwrite local directory '/opt/data/traindata' row format delimited fields terminated by '\t' select * from dw_rcm_hitop_prepare2train_dm;
注:這里是將數據導出到本地,方便后面再本地模式跑數據,導出模型數據。這里是方便演示真正的生產環境是直接用腳本提交spark任務,從hdfs取數據結果仍然在hdfs,再用ETL工具將訓練的模型結果文件輸出到web項目的文件目錄下,用來做新的模型,web項目設置了定時更新模型文件,每天按時讀取新模型文件

三、代碼詳解

package com.bjsxt.data

import java.io.PrintWriter

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.mllib.classification.{ LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD }
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.optimization.SquaredL2Updater
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkContext, SparkConf }

import scala.collection.Map

/**
 * Created by root on 2016/5/12 0012.
 */
class Recommonder {

}

object Recommonder {
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("recom").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //加載數據,用\t分隔開
    val data: RDD[Array[String]] = sc.textFile("d:/result").map(_.split("\t"))

    println("data.getNumPartitions:" + data.getNumPartitions) //如果文件在本地的話,默認是32M的分片

//    -1	Item.id,hitop_id85:1,Item.screen,screen2:1 一行數據格式
    //得到第一列的值,也就是label
    val label: RDD[String] = data.map(_(0))
    println(label)
    //sample這個RDD中保存的是每一條記錄的特征名
    val sample: RDD[Array[String]] = data.map(_(1)).map(x => {
      val arr: Array[String] = x.split(";").map(_.split(":")(0))
      arr
    })
    println(sample)
//    //將所有元素壓平,得到的是所有分特征,然后去重,最后索引化,也就是加上下標,最后轉成map是為了后面查詢用
    val dict: Map[String, Long] = sample.flatMap(x =>x).distinct().zipWithIndex().collectAsMap()
    //得到稀疏向量
    val sam: RDD[SparseVector] = sample.map(sampleFeatures => {
      //index中保存的是,未來在構建訓練集時,下面填1的索引號集合
      val index: Array[Int] = sampleFeatures.map(feature => {
        //get出來的元素程序認定可能為空,做一個類型匹配
        val rs: Long = dict.get(feature) match {
          case Some(x) => x
        }
        //非零元素下標,轉int符合SparseVector的構造函數
        rs.toInt
      })
      //SparseVector創建一個向量
      new SparseVector(dict.size, index, Array.fill(index.length)(1.0)) //通過這行代碼,將哪些地方填1,哪些地方填0
    })
    //mllib中的邏輯回歸只認1.0和0.0,這里進行一個匹配轉換
    val la: RDD[LabeledPoint] = label.map(x => {
      x match {
        case "-1" => 0.0
        case "1"  => 1.0
      }
      //標簽組合向量得到labelPoint
    }).zip(sam).map(x => new LabeledPoint(x._1, x._2))

//    val splited = la.randomSplit(Array(0.1, 0.9), 10)
//
//    la.sample(true, 0.002).saveAsTextFile("trainSet")
//    la.sample(true, 0.001).saveAsTextFile("testSet")
//    println("done")


    //邏輯回歸訓練,兩個參數,迭代次數和步長,生產常用調整參數
     val lr = new LogisticRegressionWithSGD()
    // 設置W0截距
    lr.setIntercept(true)
//    // 設置正則化
//    lr.optimizer.setUpdater(new SquaredL2Updater)
//    // 看中W模型推廣能力的權重
//    lr.optimizer.setRegParam(0.4)
    // 最大迭代次數
    lr.optimizer.setNumIterations(10)
    // 設置梯度下降的步長,學習率
    lr.optimizer.setStepSize(0.1)

    val model: LogisticRegressionModel = lr.run(la)

    //模型結果權重
    val weights: Array[Double] = model.weights.toArray
    //將map反轉,weights相應下標的權重對應map里面相應下標的特征名
    val map: Map[Long, String] = dict.map(_.swap)
    //模型保存
    //    LogisticRegressionModel.load()
    //    model.save()
    //輸出
    val pw = new PrintWriter("model");
    //遍歷
    for(i<- 0 until weights.length){
      //通過map得到每個下標相應的特征名
      val featureName = map.get(i)match {
        case Some(x) => x
        case None => ""
      }
      //特征名對應相應的權重
      val str = featureName+"\t" + weights(i)
      pw.write(str)
      pw.println()
    }
    pw.flush()
    pw.close()
  }
}

 model文件截圖如下:

各個特征下面對應的權重:

將模型文件和用戶歷史數據,和商品表數據加載到redis中去。

 代碼如下:

# -*- coding=utf-8 -*-
import redis

pool = redis.ConnectionPool(host='node05', port='6379',db=2)
r = redis.Redis(connection_pool=pool)
f1 = open('../data/ModelFile.txt')
f2 = open('../data/UserItemsHistory.txt')
f3 = open('../data/ItemList.txt')
for i in list:
    lines = i.readlines(100)
    if not lines:
        break
    for line in lines:
        kv = line.split('\t')
        if i==f1:
          r.hset("rcmd_features_score", kv[0], kv[1])
        if i == f2:
          r.hset('rcmd_user_history', kv[0], kv[1])
        if i==f3:
          r.hset('rcmd_item_list', kv[0], line[:-2])
f1.close()

 最終redis文件中截圖如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM