使用MapReduce實現二度人脈搜索算法


一,背景介紹

      在新浪微博、人人網等社交網站上,為了使用戶在網絡上認識更多的朋友,社交網站往往提供類似“你可能感興趣的人”、“間接關注推薦”等好友推薦的功能,其中就包含了二度人脈算法。

二,算法實現

原始數據集測試:

a    b
b    c
a    c
b    d
c    e
e    c
e    f

數據集說明:為關注關系,即a關注b,b關注c和d,所以a的二度人脈應該是d和c,而c已經被a關注,所以應該舍去,自己不能二度人脈是自己,如c關注e,而e又關注c

代碼實現,代碼用了兩個Job實現的

難點:兩個job如何先后執行

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;

public class De2Friends {
    public static class De2Mapper1 extends Mapper<Object,Text,Text,Text>{
        @Override
        protected void map(Object key, Text value, Context context) throws
                IOException, InterruptedException {
            String line =value.toString();
            String[] strArr = line.split("\t");
            if(strArr.length==2) {
                //關注的人
                context.write(new Text(strArr[0]), new Text("1" + strArr[1]));
                //被關注的人
                context.write(new Text(strArr[1]), new Text("0" + strArr[0]));
            }
        }
    }

    public static class De2Reducer1 extends Reducer<Text,Text,Text,Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
           Set<String> follows= new HashSet<String>();
           Set<String> fans=new HashSet<String>();
           for(Text val :values ){
               String friend =val.toString();
               if(friend.startsWith("1")){
                   context.write(key,new Text(friend));//輸出用戶已經關注的人,一度人脈
                   follows.add(friend.substring(1));
               }
               if(friend.startsWith("0")){
                   fans.add(friend.substring(1));
               }
           }
           for(String fan : fans)
               for(String follow:follows) {
                   if (!fan.equals(follow)) {
                       context.write(new Text(fan),new Text("2"+follow));
                   }
               }

        }
    }

    public static class De2Mapper2 extends  Mapper<Object,Text,Text,Text>{
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line =value.toString();
            String[] strArr=line.split("\t");
            if(strArr.length==2) {
                context.write(new Text(strArr[0]), new Text(strArr[1]));//輸出用戶的一度好友和二度好友
            }
        }
    }

    public static class De2Reducer2 extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Set<String> firstFriend = new HashSet<String>();
            Set<String> secondFriend =new HashSet<String>();
            for(Text val:values){
                String friend =val.toString();
                if(friend.contains("1")){
                    firstFriend.add(friend.substring(1));
                }
                if(friend.contains("2")){
                    secondFriend.add(friend.substring(1));
                }
            }
            for(String second:secondFriend) {
                if(!(firstFriend.contains(second)))
                    context.write(key,new Text(second)); //輸出好友的二度人脈
                }
        }
    }

    public static void main(String[] args) throws Exception{
        System.setProperty("hadoop.home.dir","E:\\softs\\majorSoft\\hadoop-2.7.5");
        Configuration conf =new Configuration();
        conf.set("mapreduce.app-submission.cross-platform", "true");
        Path fileInput = new Path("hdfs://mycluster/testFile/qq.txt");
        Path tempDir = new Path("hdfs://mycluster/output/deg2friend-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
        Path fileOutput = new Path("hdfs://mycluster/output/qq");
        Job job = Job.getInstance(conf,"de2Firend");
        job.setJar("E:\\bigData\\hadoopDemo\\out\\artifacts\\wordCount_jar\\hadoopDemo.jar");
        job.setJarByClass(De2Friends.class);
        job.setMapperClass(De2Mapper1.class);
        job.setReducerClass(De2Reducer1.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setNumReduceTasks(1);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job,fileInput);
        FileOutputFormat.setOutputPath(job,tempDir);
        job.waitForCompletion(true);//必須有,感覺是等job執行完才讓job2執行的效果,即阻塞吧

        Job job2 = Job.getInstance(conf,"de2Firend");
        job2.setJar("E:\\bigData\\hadoopDemo\\out\\artifacts\\wordCount_jar\\hadoopDemo.jar");
        job2.setJarByClass(De2Friends.class);
        job2.setMapperClass(De2Mapper2.class);
        job2.setReducerClass(De2Reducer2.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job2,tempDir);
        FileOutputFormat.setOutputPath(job2,fileOutput);

        System.exit(job2.waitForCompletion(true)?0:1);
    }
}

結果如下:

a    d
b    e
b    f
c    f

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM