hadoop實現購物商城推薦系統


1,商城:是單商家,多買家的商城系統。數據庫是mysql,語言java。

2,sqoop1.9.33:在mysql和hadoop中交換數據。

3,hadoop2.2.0:這里用於練習的是偽分布模式。

4,完畢內容:喜歡該商品的人還喜歡,同樣購物喜好的好友推薦。

步驟:

1,通過sqoop從mysql中將 “用戶收藏商品” (這里用的是用戶收藏商品信息表作為推薦系統業務上的根據,業務根據能夠非常復雜。這里主要介紹推薦系統的基本原理,所以推薦根據非常easy)的表數據導入到hdfs中。

2,用MapReduce實現推薦算法。

3,通過sqoop將推薦系統的結果寫回mysql。

4,java商城通過推薦系統的數據實現<喜歡該商品的人還喜歡,同樣購物喜好的好友推薦。>兩個功能。

實現:

1,

推薦系統的數據來源:

左邊是用戶,右邊是商品。用戶每收藏一個商品都會生成一條這種信息,<喜歡該商品的人還喜歡,同樣購物喜好的好友推薦。>的數據來源都是這張表。

sqoop導入數據,這里用的sqoop1.9.33。sqoop1.9.33的資料非常少,會出現一些錯誤,搜索不到的能夠發到我的郵箱keepmovingzx@163.com。

創建鏈接信息

這個比較簡單

創建job


信息填對就能夠了

導入數據運行 start job --jid 上面創建成功后返回的ID

導入成功后的數據

2,eclipse開發MapReduce程序

ShopxxProductRecommend<喜歡該商品的人還喜歡>

整個項目分兩部,一,以用戶對商品進行分組,二,求出商品的同現矩陣。


  第1大步的數據為輸入參數對商品進行分組

   輸出參數:

  

二,以第一步的輸出數據為輸入求商品的同現矩陣

        輸出數據

第一列數據為當前商品,第二列為與它相似的商品,第三列為相似率(越高越相似)。

整個過程就完了,以下

package xian.zhang.common;

import java.util.regex.Pattern;

public class Util {
	 public static final Pattern DELIMITER = Pattern.compile("[\t,]");
}

package xian.zhang.core;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 將輸入數據 userid1,product1  userid1,product2   userid1,product3
 * 合並成 userid1 product1,product2,product3輸出
 * @author zx
 *
 */
public class CombinProductInUser {
	
	public static class CombinProductMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] items = value.toString().split(","); 
			context.write(new IntWritable(Integer.parseInt(items[0])), new Text(items[1]));
		}
	}
	
	public static class CombinProductReducer extends Reducer<IntWritable, Text, IntWritable, Text>{

		@Override
		protected void reduce(IntWritable key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			StringBuffer sb = new StringBuffer();
			Iterator<Text> it = values.iterator();
			sb.append(it.next().toString());
			while(it.hasNext()){
				sb.append(",").append(it.next().toString());
			}
			context.write(key, new Text(sb.toString()));
		}
		
	}
	
	@SuppressWarnings("deprecation")
	public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{
		
		Configuration conf = new Configuration();
		Job job = new Job(conf,"CombinProductInUser");
		
		job.setJarByClass(CombinProductInUser.class);
		job.setMapperClass(CombinProductMapper.class);
		job.setReducerClass(CombinProductReducer.class);

		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		
		return job.waitForCompletion(true);
		
	}
	
}

package xian.zhang.core;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 將輸入數據 userid1,product1  userid1,product2   userid1,product3
 * 合並成 userid1 product1,product2,product3輸出
 * @author zx
 *
 */
public class CombinProductInUser {
	
	public static class CombinProductMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] items = value.toString().split(","); 
			context.write(new IntWritable(Integer.parseInt(items[0])), new Text(items[1]));
		}
	}
	
	public static class CombinProductReducer extends Reducer<IntWritable, Text, IntWritable, Text>{

		@Override
		protected void reduce(IntWritable key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			StringBuffer sb = new StringBuffer();
			Iterator<Text> it = values.iterator();
			sb.append(it.next().toString());
			while(it.hasNext()){
				sb.append(",").append(it.next().toString());
			}
			context.write(key, new Text(sb.toString()));
		}
		
	}
	
	@SuppressWarnings("deprecation")
	public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{
		
		Configuration conf = new Configuration();
		Job job = new Job(conf,"CombinProductInUser");
		
		job.setJarByClass(CombinProductInUser.class);
		job.setMapperClass(CombinProductMapper.class);
		job.setReducerClass(CombinProductReducer.class);

		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		
		return job.waitForCompletion(true);
		
	}
	
}

package xian.zhang.core;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

public class Main {
	
	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
		
		if(args.length < 2){
			throw new IllegalArgumentException("要有兩個參數,數據輸入的路徑和輸出路徑");
		}
		
		Path inPath1 = new Path(args[0]);
		Path outPath1 = new Path(inPath1.getParent()+"/CombinProduct");
		
		Path inPath2 = outPath1;
		Path outPath2 = new Path(args[1]);
		
		if(CombinProductInUser.run(inPath1, outPath1)){
			System.exit(ProductCo_occurrenceMatrix.run(inPath2, outPath2)?0:1);
		}
	}
	
}


ShopxxUserRecommend<同樣購物喜好的好友推薦>

整個項目分兩部,一,以商品對用戶進行分組,二,求出用戶的同現矩陣。

原理和ShopxxProductRecommend一樣

以下附上代碼

package xian.zhang.common;

import java.util.regex.Pattern;

public class Util {
	 public static final Pattern DELIMITER = Pattern.compile("[\t,]");
}

package xian.zhang.core;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 將輸入數據 userid1,product1  userid1,product2   userid1,product3
 * 合並成 productid1 user1,user2,user3輸出
 * @author zx
 *
 */
public class CombinUserInProduct {
	
	public static class CombinUserMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] items = value.toString().split(","); 
			context.write(new IntWritable(Integer.parseInt(items[1])), new Text(items[0]));
		}
	}
	
	public static class CombinUserReducer extends Reducer<IntWritable, Text, IntWritable, Text>{

		@Override
		protected void reduce(IntWritable key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			StringBuffer sb = new StringBuffer();
			Iterator<Text> it = values.iterator();
			sb.append(it.next().toString());
			while(it.hasNext()){
				sb.append(",").append(it.next().toString());
			}
			context.write(key, new Text(sb.toString()));
		}
		
	}
	
	@SuppressWarnings("deprecation")
	public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration conf = new Configuration();
		Job job = new Job(conf,"CombinUserInProduct");
		
		job.setJarByClass(CombinUserInProduct.class);
		job.setMapperClass(CombinUserMapper.class);
		job.setReducerClass(CombinUserReducer.class);

		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		
		return job.waitForCompletion(true);
		
	}
	
}

package xian.zhang.core;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import xian.zhang.common.Util;

/**
 * 用戶的同先矩陣
 * @author zx
 *
 */
public class UserCo_occurrenceMatrix {

	public static class Co_occurrenceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

		IntWritable one = new IntWritable(1);
		
		@Override
		protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
			
			String[] products = Util.DELIMITER.split(value.toString());
			for(int i=1;i<products.length;i++){
				for(int j=1;j<products.length;j++){
					if(i != j){
						context.write(new Text(products[i] + ":" + products[j]), one);
					}
				}
			}
			
		}
		
	}
	
	public static class Co_occurrenceReducer extends Reducer<Text, IntWritable, NullWritable, Text>{

		NullWritable nullKey =NullWritable.get();
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			Iterator<IntWritable> it = values.iterator();
			while(it.hasNext()){
				sum += it.next().get();
			}
			context.write(nullKey, new Text(key.toString().replace(":", ",") + "," + sum));
		}
		
	}
	
	@SuppressWarnings("deprecation")
	public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{
		
		Configuration conf = new Configuration();
		Job job = new Job(conf,"UserCo_occurrenceMatrix");
		
		job.setJarByClass(UserCo_occurrenceMatrix.class);
		job.setMapperClass(Co_occurrenceMapper.class);
		job.setReducerClass(Co_occurrenceReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		
		FileInputFormat.addInputPath(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		
		return job.waitForCompletion(true);
	}
	
}

package xian.zhang.core;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

public class Main {
	
	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
		
		if(args.length < 2){
			throw new IllegalArgumentException("要有兩個參數,數據輸入的路徑和輸出路徑");
		}
		
		Path inPath1 = new Path(args[0]);
		Path outPath1 = new Path(inPath1.getParent()+"/CombinUser");
		
		Path inPath2 = outPath1;
		Path outPath2 = new Path(args[1]);
		
		if(CombinUserInProduct.run(inPath1, outPath1)){
			System.exit(UserCo_occurrenceMatrix.run(inPath2, outPath2)?0:1);
		}
	}
	
}

代碼在github上有

git@github.com:chaoku/ShopxxProductRecommend.git



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM