1,商城:是單商家,多買家的商城系統。數據庫是mysql,語言java。
2,sqoop1.9.33:在mysql和hadoop中交換數據。
3,hadoop2.2.0:這里用於練習的是偽分布模式。
4,完畢內容:喜歡該商品的人還喜歡,同樣購物喜好的好友推薦。
步驟:
1,通過sqoop從mysql中將 “用戶收藏商品” (這里用的是用戶收藏商品信息表作為推薦系統業務上的根據,業務根據能夠非常復雜。這里主要介紹推薦系統的基本原理,所以推薦根據非常easy)的表數據導入到hdfs中。
2,用MapReduce實現推薦算法。
3,通過sqoop將推薦系統的結果寫回mysql。
4,java商城通過推薦系統的數據實現<喜歡該商品的人還喜歡,同樣購物喜好的好友推薦。>兩個功能。
實現:
1,
推薦系統的數據來源:
左邊是用戶,右邊是商品。用戶每收藏一個商品都會生成一條這種信息,<喜歡該商品的人還喜歡,同樣購物喜好的好友推薦。>的數據來源都是這張表。
sqoop導入數據,這里用的sqoop1.9.33。sqoop1.9.33的資料非常少,會出現一些錯誤,搜索不到的能夠發到我的郵箱keepmovingzx@163.com。
創建鏈接信息
這個比較簡單
創建job
信息填對就能夠了
導入數據運行 start job --jid 上面創建成功后返回的ID
導入成功后的數據
2,eclipse開發MapReduce程序
ShopxxProductRecommend<喜歡該商品的人還喜歡>
整個項目分兩部,一,以用戶對商品進行分組,二,求出商品的同現矩陣。
一
第1大步的數據為輸入參數對商品進行分組
輸出參數:
二,以第一步的輸出數據為輸入求商品的同現矩陣
輸出數據
第一列數據為當前商品,第二列為與它相似的商品,第三列為相似率(越高越相似)。
整個過程就完了,以下
package xian.zhang.common; import java.util.regex.Pattern; public class Util { public static final Pattern DELIMITER = Pattern.compile("[\t,]"); }
package xian.zhang.core; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 將輸入數據 userid1,product1 userid1,product2 userid1,product3 * 合並成 userid1 product1,product2,product3輸出 * @author zx * */ public class CombinProductInUser { public static class CombinProductMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); context.write(new IntWritable(Integer.parseInt(items[0])), new Text(items[1])); } } public static class CombinProductReducer extends Reducer<IntWritable, Text, IntWritable, Text>{ @Override protected void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); Iterator<Text> it = values.iterator(); sb.append(it.next().toString()); while(it.hasNext()){ sb.append(",").append(it.next().toString()); } context.write(key, new Text(sb.toString())); } } @SuppressWarnings("deprecation") public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration(); Job job = new Job(conf,"CombinProductInUser"); job.setJarByClass(CombinProductInUser.class); job.setMapperClass(CombinProductMapper.class); job.setReducerClass(CombinProductReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true); } }
package xian.zhang.core; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 將輸入數據 userid1,product1 userid1,product2 userid1,product3 * 合並成 userid1 product1,product2,product3輸出 * @author zx * */ public class CombinProductInUser { public static class CombinProductMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); context.write(new IntWritable(Integer.parseInt(items[0])), new Text(items[1])); } } public static class CombinProductReducer extends Reducer<IntWritable, Text, IntWritable, Text>{ @Override protected void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); Iterator<Text> it = values.iterator(); sb.append(it.next().toString()); while(it.hasNext()){ sb.append(",").append(it.next().toString()); } context.write(key, new Text(sb.toString())); } } @SuppressWarnings("deprecation") public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration(); Job job = new Job(conf,"CombinProductInUser"); job.setJarByClass(CombinProductInUser.class); job.setMapperClass(CombinProductMapper.class); job.setReducerClass(CombinProductReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true); } }
package xian.zhang.core; import java.io.IOException; import org.apache.hadoop.fs.Path; public class Main { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { if(args.length < 2){ throw new IllegalArgumentException("要有兩個參數,數據輸入的路徑和輸出路徑"); } Path inPath1 = new Path(args[0]); Path outPath1 = new Path(inPath1.getParent()+"/CombinProduct"); Path inPath2 = outPath1; Path outPath2 = new Path(args[1]); if(CombinProductInUser.run(inPath1, outPath1)){ System.exit(ProductCo_occurrenceMatrix.run(inPath2, outPath2)?0:1); } } }
ShopxxUserRecommend<同樣購物喜好的好友推薦>
整個項目分兩部,一,以商品對用戶進行分組,二,求出用戶的同現矩陣。
原理和ShopxxProductRecommend一樣
以下附上代碼
package xian.zhang.common; import java.util.regex.Pattern; public class Util { public static final Pattern DELIMITER = Pattern.compile("[\t,]"); }
package xian.zhang.core; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 將輸入數據 userid1,product1 userid1,product2 userid1,product3 * 合並成 productid1 user1,user2,user3輸出 * @author zx * */ public class CombinUserInProduct { public static class CombinUserMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); context.write(new IntWritable(Integer.parseInt(items[1])), new Text(items[0])); } } public static class CombinUserReducer extends Reducer<IntWritable, Text, IntWritable, Text>{ @Override protected void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); Iterator<Text> it = values.iterator(); sb.append(it.next().toString()); while(it.hasNext()){ sb.append(",").append(it.next().toString()); } context.write(key, new Text(sb.toString())); } } @SuppressWarnings("deprecation") public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration(); Job job = new Job(conf,"CombinUserInProduct"); job.setJarByClass(CombinUserInProduct.class); job.setMapperClass(CombinUserMapper.class); job.setReducerClass(CombinUserReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true); } }
package xian.zhang.core; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import xian.zhang.common.Util; /** * 用戶的同先矩陣 * @author zx * */ public class UserCo_occurrenceMatrix { public static class Co_occurrenceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String[] products = Util.DELIMITER.split(value.toString()); for(int i=1;i<products.length;i++){ for(int j=1;j<products.length;j++){ if(i != j){ context.write(new Text(products[i] + ":" + products[j]), one); } } } } } public static class Co_occurrenceReducer extends Reducer<Text, IntWritable, NullWritable, Text>{ NullWritable nullKey =NullWritable.get(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; Iterator<IntWritable> it = values.iterator(); while(it.hasNext()){ sum += it.next().get(); } context.write(nullKey, new Text(key.toString().replace(":", ",") + "," + sum)); } } @SuppressWarnings("deprecation") public static boolean run(Path inPath,Path outPath) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = new Configuration(); Job job = new Job(conf,"UserCo_occurrenceMatrix"); job.setJarByClass(UserCo_occurrenceMatrix.class); job.setMapperClass(Co_occurrenceMapper.class); job.setReducerClass(Co_occurrenceReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputKeyClass(Text.class); FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); return job.waitForCompletion(true); } }
package xian.zhang.core; import java.io.IOException; import org.apache.hadoop.fs.Path; public class Main { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { if(args.length < 2){ throw new IllegalArgumentException("要有兩個參數,數據輸入的路徑和輸出路徑"); } Path inPath1 = new Path(args[0]); Path outPath1 = new Path(inPath1.getParent()+"/CombinUser"); Path inPath2 = outPath1; Path outPath2 = new Path(args[1]); if(CombinUserInProduct.run(inPath1, outPath1)){ System.exit(UserCo_occurrenceMatrix.run(inPath2, outPath2)?0:1); } } }
代碼在github上有
git@github.com:chaoku/ShopxxProductRecommend.git