使用Spark MLlib中推薦算法ALS對電影評分數據MovieLens推薦


數據集下載地址:http://files.grouplens.org/datasets/movielens/

#!usr/bin/env python
# -*- coding:utf-8 _*-

"""
@author: Ivan
@version: v1.0
@time: 2018-12-14 12:36
"""
# 使用Spark MLlib中推薦算法ALS對電影評分數據MovieLens推薦
from pyspark.sql import SparkSession
from pyspark.mllib.recommendation import ALS, Rating, MatrixFactorizationModel
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.linalg import DenseVector


def alsModelEvaluate(model, testing_rdd):
    # 對測試數據集預測評分,針對測試數據集進行預測
    predict_rdd = model.predictAll(testing_rdd.map(lambda r: (r[0], r[1])))
    print(predict_rdd.take(5))
    predict_actual_rdd = predict_rdd.map(lambda r: ((r[0], r[1]), r[2])) \
        .join(testing_ratings.map(lambda r: ((r[0], r[1]), r[2])))

    print(predict_actual_rdd.take(5))
    # 創建評估指標實例對象
    metrics = RegressionMetrics(predict_actual_rdd.map(lambda pr: pr[1]))

    print("MSE = %s" % metrics.meanSquaredError)
    print("RMSE = %s" % metrics.rootMeanSquaredError)

    # 返回均方根誤差
    return metrics.rootMeanSquaredError


def train_model_evaluate(training_rdd, testing_rdd, rank, iterations, lambda_):
    # 定義函數,訓練模型與模型評估
    # 使用超參數的值,訓練數據和ALS算法訓練模型
    model = ALS.train(training_rdd, rank, iterations, lambda_)

    # 模型的評估
    rmse_value = alsModelEvaluate(model, testing_rdd)

    # 返回多元組
    return (model, rmse_value, rank, iterations, lambda_)


if __name__ == "__main__":
    # 構建SparkSession實例對象
    spark = SparkSession.builder \
        .appName("SparkSessionExample") \
        .master("local") \
        .getOrCreate()

    # 獲取SparkContext實例對象
    sc = spark.sparkContext

    # 讀取數據
    raw_ratings_rdd = sc.textFile("/wsh/project/Python/spark/data/u.data")
    # print(raw_ratings_rdd.count())
    # print(raw_ratings_rdd.first())

    # 獲取評分數據前三個字段,構建Rating實例對象
    ratings_rdd = raw_ratings_rdd.map(lambda line: line.split('\t')[0:3])
    # print(ratings_rdd.first())

    ratings_datas = ratings_rdd.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
    # print(ratings_datas.first())

    # 查看評分數據中有多少電影
    # print(ratings_datas.map(lambda x: x[1]).distinct().count())

    # 查看評分數據中有多少用戶
    # print(ratings_datas.map(lambda x: x[0]).distinct().count())

    # 將數據集分為訓練數據集和測試數據集
    training_ratings, testing_ratings = ratings_datas.randomSplit([0.8, 0.2])

    '''
        # 使用ALS算法來訓練模型
        # help(ALS)
        # 采用顯示評分函數訓練模型
        alsModel = ALS.train(training_ratings, 10, iterations=10, lambda_=0.01)
    
        # 用戶特征因子矩陣
        user_feature_matrix = alsModel.userFeatures()
        print(type(user_feature_matrix))
        print(user_feature_matrix.take(10))
    
        # 物品因子矩陣
        item_feature_matrix = alsModel.productFeatures()
        print(type(item_feature_matrix))
        print(item_feature_matrix.take(10))
    
        # 預測某個用戶對某個電影的評分
        
        # 假設用戶196,對電影242的評分,實際評分為3分
        
        predictRating = alsModel.predict(196, 242)
        print(predictRating)
    
        # 為用戶推薦(10部電影)
        rmdMovies = alsModel.recommendProducts(196, 10)
        print(rmdMovies)
    
        # 為電影推薦(10個用戶)
        rmdUsers = alsModel.recommendUsers(242, 10)
        print(rmdUsers)
    '''

    # 怎么評價模型的好壞,ALS模型評估指標(類似回歸算法模型預測值,連續值),使用回歸模型中
    # RMSE(均方根誤差)評估模型
    # 找到最佳模型
    '''
        如何找到最佳模型??
            -a. 模型的評估
                計算RMSE
            -b. 模型的優化,兩個方向
                1、數據
                2、超參數的調整,選擇合適的超參數的值,得到最優模型
            交叉驗證
                訓練數據集、驗證數據集、測試數據集
            K-Folds交叉驗證
    '''

    # ALS算法的超參數的調整
    # 定義一個函數,用於對模型進行評估
    # 使用三層for循環,設置不同參數的值,分別使用ALS算法訓練模型,評估獲取RMSE的值
    metrix_list = [train_model_evaluate(training_ratings, testing_ratings, param_rank, param_iterations, param_lambda)
                   for param_rank in [10, 20]
                   for param_iterations in [10, 20]
                   for param_lambda in [0.001, 0.01]
                   ]
    print(type(metrix_list))
    sorted(metrix_list, key=lambda k: k[1], reverse=False)
    model, rmse_value, rank, iterations, lambda_ = metrix_list[0]
    print("The best parameters, rank=%s, iterations=%s, lambda_=%s" % rank % iterations % lambda_)

    # 保存模型
    model.save(sc, "/wsh/project/Python/spark/data/als_model")

    # 加載模型
    load_model = MatrixFactorizationModel.load(sc, "/wsh/project/Python/spark/data/als_model")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM