使用协同过滤推荐算法进行电影推荐
机器学习算法,pyspark中的ALS算法,实现对用户的电影推荐。
1. Spark是一个开源的并行计算与分布式计算框架,最大特点是基于内存运算,适合迭代运算,兼容Hadoop生态系统的组件,同时包括相关的测试和数据生成器。
2. 主要用于解决全栈式批处理、结构化数据查询、流计算、图计算和机器学习的应用,适用于需要多次操作特定数据集的应用市场。
3. 需要反复操作的次数越多,所需读取的数据量越大,效率提升越大,这方面比Hadoop快很多倍。
4. 集成的模块:Spark SQL Spark Streaming MLlib GraphX SparkR(支持R语言的库)
5. 基于spark的协同过滤推荐算法并灭有依赖具体的业务数据,比如电影的内容分析和用户特征属性分析,证明是一个通用的算法框架,可以用户其他行业的个性化推荐,比如餐饮推荐,音乐推荐等,只要将评分数据转化成.csv格式即可直接应用。
import pandas as pd from pyspark.mllib.recommendation import ALS # from pyspark.sql import SparkSession from pyspark import SparkContext import math import warnings import os warnings.filterwarnings(ignore) # pip install pyspark -i https://mirrors.aliyun.com/pypi/simple/ print("1.加载评分文件……") # spark = SparkSession.builder.master(local).appName("test_script").getOrCreate() sc = SparkContext() # sc.setLogLevel("ERROR") small_raw_data = sc.textFile(os.path.normpath(dataset/ratings.csv)) small_data = small_raw_data.map(lambda line: line.split(",")).map(lambda col:(col[0],col[1],col[2])) print("2.按照6:2:2分为训练集、验证集、测试集……") training_RDD,validation_RDD,test_RDD = small_data.randomSplit([6,2,2],seed=10) validation_predict_RDD = validation_RDD.map(lambda x:(x[0],x[1])) test_predict_RDD = test_RDD.map(lambda x:(x[0],x[1])) print("3. 设置协同过滤推荐算法ALS(交替最小二乘法)参数……") min_error = float(inf) best_rank = 1 best_iteration =-1 regularization_param = 0.3 iterations = 10 seed = 10 ranks = [4,8,12] errors = [0,0,0] err = 0 for rank in ranks: model = ALS.train(training_RDD,rank,seed= 10,iterations=10,lambda_ =0.3) predict = model.predictAll(validation_predict_RDD).map(lambda r:((r[0]),r[1],r[2])) rate_pre = validation_RDD.map(lambda r:((int(r[0]),int(r[1])),float(r[2]))).join(predict) error = math.sqrt(rate_pre.map(lambda r:(r[1][0] - r[1][1])**2).mean()) errors[err] = error err+= 1 if error <min_error: min_error = error best_rank = rank print("4.训练模型,确认最佳的秩(rank),确认最小误差……") print("最佳秩值:", best_rank) print("最小的误差:",min_error) print("5.用最佳秩重新训练模型……") model = ALS.train(training_RDD,best_rank,seed=seed,iterations=iterations, lambda_ =regularization_param) # 保存模型 # model.save(sc,"spark_movie.model") # sameModel = MatrixFactorizationModel.load(sc,"spark_movie.model") print("6.使用测试集对模型进行测试……") predictions = model.predictAll(test_predict_RDD).map(lambda r: ((r[0],r[1]),r[2])) rates_p = test_RDD.map(lambda r: ((int(r[0]),int(r[1])),float(r[2]))).join(predictions) error = math.sqrt(rates_p.map(lambda r:(r[1][0]-r[1][1])** 2).mean()) print(REMS = %s %error) print("7.计算测试集最小误差……") print("测试集最小误差RMSE=",error) print("8.预测用户对电影的评分……") user_id =15 movie_id= 47 predictedRating = model.predict(user_id,movie_id) print("用户编号:",user_id,"对电影:",movie_id,"的评分为:",predictedRating) print("9.向某一用户推荐10部电影:") topKRecs = model.recommendProducts(user_id,10) print("向用户编号:",user_id,"的用户推荐10部电影:") for rec in topKRecs: print(rec)
数据集可以评论区找我要
上一篇:
通过多线程提高代码的执行效率例子
下一篇:
深度学习—BP神经网络