基於Spark的電影推薦系統


數據文件:

u.data(userid  itemid  rating  timestamp)

u.item(主要使用 movieid movietitle)

 

數據操作

把u.data導入RDD, take()  x.split(‘\t’)(1)

 

查看userid字段的統計信息

查看udata數據矩陣的 userid列上所有值的統計信息

 

使用ALS.train進行訓練

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.Rating

 

//讀取userData前3個字段 用戶,產品,評分來創建rawRatings

 

val userData=sc.textFile(”file:/home/hadoop/ALS/u.data”)

val rawRatings=userData.map(_.split(‘\t’).take(3))

 

//1.准備ALS訓練數據

// import org.apache.spark.mllib.recommendation.Rating

//把3個split列case到3個名稱變量中(user,movie,rating)后面便於操作

val ratingRDD= rawRatings.map{

         case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)

}

 

//2.使用ALS.train進行訓練

//使用顯示Explicit rating評分訓練.

// ALS.train(ratings:RDD[Rating],rank:Int,iterations:Int,lambda:Double):MatrixFactorizationModel

//隱式Implicit評分訓練

// ALS.trainImplicit(ratings:RDD[Rating],rank:Int,iterations:Int,lambda:Double):MatrixFactorizationModel

//Rating(userID,productID,rating),rank 當矩陣分解matrix factorization時,將原本矩陣A(mXn)分解成X(m X rank) Y(rank X n)矩陣

//Iterator   ALS算法重復計算次數,建議10~20

//lambda   建議0.01  會影響准確度|時間

//MatrixFactorizationMode訓練完成后產生的模型,訓練時會執行矩陣分解。模型對象成員 rank,

userFeatures:RDD[(Int,Array[Double])]分解后的用戶矩陣X(m*rank) 

productFeatures:RDD[(Int,Array[Double])]分解后的產品矩陣Y(m*rank) 

 

val model=ALS.train(ratingrdd,10,10,0.01)

 

 

使用模型推薦

//1.針對用戶 推薦電影  

model.recommendProducts(196,5).mkString("\n")

//傳入user=196被推薦用戶ID,num=5 推薦數, 返回Array[Rating]

 

//2.針對用戶 推薦產品的系統評分

model.predict(196,464)   //系統對用戶196推薦產品464的評分

//3.針對電影 推薦給用戶

//當要促銷某些電影時,可以找出可能對這些電影感興趣的會員 使用model.recommendUsers(product:Int,num:Int):Array[Rating] 方法推薦

product要被推薦的電影ID,num推薦的記錄數,返回系統針對產品推薦給用戶的數組

model.recommendUser(464,5).mkString(“,”)

//把464推薦給用戶471,系統評分 13.0232...

 

顯示推薦電影名稱

//1 創建電影ID-名稱的對照表

val itemrdd=sc.textFile("file:/home/hadoop/ALS/u.item")

val moviet=itemrdd.map(_.split("\\|").take(2)).map(x=>(x(0),x(1))).collectAsMap 

//map(x=>(x(0),x(1))).collect().toMap

 

 

//2顯示對照表前5條數據

moviet.take(5).foreach(println)

 

 

//查詢電影名稱

 

scala> moviet(590)

res114: String = Hellraiser: Bloodline (1996)

 

//2 顯示前5條推薦電影名稱

model.recommendProducts(195,5).map(rating=>(rating.product,moviet(rating.product),rating.rating)).foreach(println)

 

 

 

Recommend項目代碼

import java.io.File

import scala.io.Source

import org.apache.spark.{SparkContext,SparkConf}

import org.apache.spark.rdd._

import org.apache.spark.mllib.recommendation.{ALS,Rating,MatrixFactorizationModel}

 

main程序

val (ratingrdd,moviet)=PrepareData()  //產生ratingrdd評分數據 與 moviet(id,title)

val model=ALS.train(ratingrdd,5,20,0.1)  //訓練模型

recommend(model,moviet)  //推薦

 

 

//針對用戶推薦電影

def RecommendMovies(model:MatrixfactorizationModel,movieTitle:Map[Int,String],inputUserID:Int)={

val RecomendMovie=model.recommendProducts(inputUserID,10)

RecomendMovie.foreach{r=>pritnln(i.toString+moviet(r.product)+r.rating); i+=1 }

}

//針對電影推薦用戶

def RecommendUsers(model:MatrixfactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int)={

val RecomendUser=model.recommendUsers(inputMovieID,10)

RecomendUser.foreach{r=>pritnln(i.toString+r.user+r.rating); i+=1 }

}

評估模型ALSEvaluation.main

調校model參數

model的rank,iterator,lambda這些參數設置會影響結果的准確度,運行時間。下面調校找出最佳參數組合

 

import java.io.File

import scala.io.Source

import org.apache.spark.{SparkContext,SparkConf}

import org.apache.spark.rdd._

import org.apache.spark.mllib.recommendation.{ALS,Rating,MatrixFactorizationModel}

import org.apache.spark.mllib.regression.LabeledPoint

 

def main(arg:Array[String]):Unit={

//1 准備數據階段

val (trainData,validationData,testData)=prepareData()

trainData.persist();validationData.persist();testData.persist()

 

//2 trainData訓練model,validationData評估模型的RMSE均方根誤差

//參數設置會影響誤差,需反復執行訓練和評估找出最佳組合,最后返回

//bestModel模型進入下一階段測試

val bestModel=trainValidation(trainData,validationData)

 

//3 使用testData在測試一次,避免出現overfitting(訓練過度).如果rmse在trainData訓練//和testData測試差異不大代表無過度

val testRmse=computeRMSE(bestModel,testData)

println(“使用testData測試bestModel,結果rmse=”+testRmse)

 

trainData.unpersist();validationData.unpersist();testData.unpersist()

}

1.准備數據-prepareData

def prepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={

  val Array(trainData,validationData,testData)=ratingrdd.randomSplit(Array(0.8,0.1,0.1))

  return (trainData,validationData,testData)

}

 

2.進行訓練評估-trainvalidation

def trainvalidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={

//評估rank

evaluateParameter(trainData,validationData,”rank”,Array(5,10,15,20,50,100),Array(10),Array(0.1)

//評估iter

evaluateParameter(trainData,validationData,”numIterators”,Array(10),Array(5,10,15,20,25),Array(0.1));

//評估lambda

evaluateParameter(trainData,validationData,”lambda”,Array(10),Array(10),Array(0.05,0.1,1,5,10.0));

//所有參數找出最好組合

val bestModel=evaluateAllParameter(trainData,validationData, Array(5,10,15,20,50,100), Array(5,10,15,20,25), Array(0.05,0.1,1,5,10.0));

 

return (bestModel)

}

 

//以上3個參數有5*5*5=125個組合,對每一種參數組合評估Rmse 最好找出具有最小誤差Rmse的參數組合就是最佳組合。

 

1.評估單個參數-evaluateParameter

那個參數具有比較低的誤差,繪制圖形

def evaluateParameter(trainData,validationData,p:String,ranks:Array[Int],its:Array[String],lambdas:Array[Double]):(Double,String)={

for(rank<-ranks;it<-its;lambda<-lambdas){

val rmse=trainModel(trainData,validationData,rank,it,lambda)

val pData=p match{

case “rank”=>”rank:”+rank;  //pData取到for中的值

case “it”=>”it:”+it;

case “lambda”=>”lambda:”+lambda;

}

}

(rmse,pData)

}

 

-----------包含畫圖

def evaluateParameter(trainData,validationData,p:String,ranks:Array[Int],its:Array[String],lambdas:Array[Double])={

var databarchart=new DefaultCategoryDataset();

var datalinechart=new DefaultCategoryDataset();

 

for(rank<-ranks;it<-its;lambda<-lambdas){

val rmse=trainModel(trainData,validationData,rank,it,lambda)

val pData=p match{

case “rank”=>rank;  //pData取到for中的值

case “it”=>it;

case “lambda”=>lambda;

}

databarchart.addValue(rmse,p,pData.toString)

datslinechart.addValue(time,”Time”,pData.toString)

}

Chart.plotBarLineChart(“ALS evaluations ”+p,p,”RMSE”,0.58,5,”Time”,databarchart,datalinechart);

}

 

def evaluateAllParameter(trainData:RDD[Rating],validationData:RDD[Rating],ranks:Array[Int],its:Array[Int],lambdas:Array[Double]):(Double,String):MatrixFactorizationModel={

val evaluations=for(rank<-ranks;it<-its;lambda<-lambdas)

yield{

val rmse=trainModel(trainData,validationData,rank,it,lambda)

(rank,it,lambda,rmse)

}

val Eval=(evaluations.sortBy(_._4))

val BestEval=Eval(0)

println(“最佳model參數 rank=”+BestEval._1+”,it=”+BestEval._2+” lambda=”+BestEval._3)

 val bestModel=ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)

(bestModel)

}

 

2.訓練模型-trainModel

def trainModel(trainData:RDD[Rating], validationData:RDD[Rating],rank:Int,it:Int,lambda:Double):Double={

val model=ALS.train(trainData,rank,it,lambda)

val rmse=computeRmse(model,validationData)

(rmse)

}

 

3.計算RMSE- computeRmse

root mean square error用來計算推薦系統對用戶喜好的預測與實際喜好的誤差平均值

誤差越小代表預測值與真實值越接近

公式

 

def computeRmse(model:MatrixFactorizationModel,validationData:RDD[Rating]):Double={

         val num= validationrdd.count

         val predictedRDD=model.predict(validationrdd.map(r=>(r.user,r.product)))

//取出user+product傳入預測方法,預測將結果存入predictRDD.

         val predictedAndRatings=predictedRDD.map(p=>((p.user,p.product),p.rating))

                  .join(validationrData.map(r=>((r.user,r.product),r.rating))).values

//predictRDD預測集join ratingrdd真實集

         math.sqrt(predictedAndRatings.map(x=>(x._1-x._2)*(x._1-x._2)).reduce(_+_)/num)

//針對每條數據進行計算,傳入x。x._1預測結果-x._2真實數據,相乘就是平方

//reduct將全部誤差求和后,在除以個數num。最后計算sqrt平方根

}

 

3.運行ALSEvaluation

評估rank參數 看到對RMSE沒有太大差別,但是越大運行時間會增加

評估it參數  對RMSE沒有太大差別,時間會增加

評估lambda參數 0.1時  RMSE最小,所需時間差異不大

所有參數交叉評估找出最好參數組合

 

運行evaluationAllParameter后顯示

最佳model的參數組合:rank20,it15,lamdba0.1  結果rmse=0.72312434463...

 

最好使用testData再次驗證bestModel結果0.7191119923578929,確認無overfitting問題

相同的validationData,在model中

沒優化得到RMSE值0.7745372...

優化后得到RMSE值0.7134127699...

最后 找出最佳參數組合,可以修改Recommend.scala為最佳組合

val model=ALS.train(ratings,20,15,0.1)

 

附錄:Chart

import org.jfree.chart._

import org.jfree.data.xy._

import org.jfree.data.category.DefaultCategoryDataset

import java.awt.Color

import java.awt.BasicStroke

object Chart{

 

def plotBarLineChart(Title:String,xLabel:String,yBarLabel:String,yBarMin:Double,yBarMax:Double,yLineLabel:String,dataBarChart:DefaultCategoryDataset,dataLineChart:DefaultCategoryDataset):Unit={

 

//畫出Bar chart

val chart=ChartFactory.createBarChart(“”,xLabel,yBarLabel,

dataBarChart,   //bar chart數據

PlotOrientation.VERTICAL,  //畫圖方向垂直

true,   //包含legend

true,   //顯示tooltips

false  //不要url generator

)

val plot=chart.getCategoryPlot();

plot.setBackgroundPaint(new Color(0xFF,0xFF,0xFF))

plot.setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT)

plot.setDataset(1,dataLineChart);

plot.mapDatasetToRangeAxis(1,1)

//畫柱狀圖 Y軸

val vn=plot.getRangeAxis();vn.setRange(yBarMin,yBarMax);vn.setAutoTickUnitSelection(true)

 

//畫折線圖y軸

val axis2=new NumberAxis(yLineLabel);plot.setRangeAxis(1,axis2);

val renderer2=new LineAndShapeRender();

renderer2.setToolTipGenerator(new StandardCategoryToolTipGenerator());

 

//先畫柱形圖,再畫折線圖

plot.setRenderer(1,renderer2);plot.setDatasetRenderingOrder(DatasetRenderingOrder.FORWARD);

//創建畫框

val frame=new ChartFrame(Title,chart); frame.setSize(500,500);

frame.pack();frame.setVisible(true);

}

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM