MapReduce實現二度好友關系


一、問題定義

     我在網上找了些,關於二度人脈算法的實現,大部分無非是通過廣度搜索算法來查找,猶豫深度已經明確了2以內;這個算法其實很簡單,第一步找到你關注的人;第二步找到這些人關注的人,最后找出第二步結果中出現頻率最高的一個或多個人(頻率這塊沒完成),即完成。

     但如果有千萬級別的用戶,那在運算時,就肯定會把這些用戶的follow 關系放到內存中,計算的時候依次查找;先說明下我沒有明確的診斷對比,這樣做的效果一定沒 基於hadoop實現的好;只是自己,想用hadoop實現下,最近也在學;若有不足的地方還請指點。

  任務是求其其中的二度人脈、潛在好友,也就是如下圖:

  比如I認識C、G、H,但C不認識G,那么C-G就是一對潛在好友,但G-H早就認識了,因此不算為潛在好友。

  那么一個關鍵問題是如何輸入輸入。

  首先是五項五環圖,可以看出共有13條邊,那么輸入數據也有13條就夠了,比如說先輸入AB,那么輪到b時候就不輸入BA了,級變速如也沒關系,因為會去重。

 

二、原理分析

  首先,我們進行第一個MapReduce,同樣是一個輸入行,產生一對互逆的關系,壓入context,例如Tom Lucy這個輸入行就在Map階段搞出Tom Lucy-Lucy Tom這樣的互逆關系。之后Map-reduce會自動對context中相同的key合並在一起。例如由於存在Tom Lucy、Tom Jack,顯然會產生一個Tom:{Lucy,Jack},這是Reduce階段以開始的鍵值對。這個鍵值對相當於Tom所認識的人。先進行如下的輸出,潛在好友顯然會在{Lucy,Jack}這個Tom所認識的人產生,對這個數組做笛卡爾乘積,形成關系:{<Lucy,Lucy>,<Jack,Jack>,<Lucy,Jack>,<Jack,Lucy>},也就是<Lucy,Lucy>這類無意義的剔除,<Lucy,Jack>,<Jack,Lucy>認定為一個關系,將剩余關系進行如下的輸出。

  不過計算笛卡爾積就像雙重for對同一個數組,重復計算了一半,怎么減少了,我程序里是HashSet,第二重如何從第一寵Set的iterator哪里開始呢。

三、代碼

3.1 Mapper

package friends;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Deg2FriendMapper extends Mapper<LongWritable, Text, Text, Text> {

	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		//   "\t"表示制表符
		//StringTokenizer st = new StringTokenizer(line,",");
		//while(st.hasMoreTokens())
		//用while循環的時候是一行有很多才需要
		String[] ss = line.split(",");
		context.write(new Text(ss[0]), new Text(ss[1]));
		context.write(new Text(ss[1]), new Text(ss[0]));
	}

}

  

3.2 Reducer

package friends;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Deg2Reducer extends Reducer<Text, Text, Text, Text> {

	public void reduce(Text key, Iterable<Text> value, Context context)
			throws IOException, InterruptedException {
		// process values
		
		//首先是key相同的合並,同時取出value笛卡爾積之后的重復關系
		Set<String> set = new HashSet<String>();
		
		for (Text t : value) {//相同key合並
			//但是為什么用HashSet,因為Map里面謝了反響關系,比如 對於A節點,謝了AB,BA,
			//對於B節點,謝了BA,AB,那么A開頭的有兩次AB,去重,
			//為什么要for循環 因為A可能有很多朋友
			//
			set.add(t.toString());
		}
		if(set.size()>=2) {//否則說明只有一度好友關系
			//對value的值做笛卡爾積
			Iterator<String> iter = set.iterator();
			while(iter.hasNext()) {
				String name = iter.next();
				//iterator寫成for循環的話 第三個條件沒有 否則for內娶不到元素
				for(Iterator<String> iter2 = set.iterator();iter2.hasNext();) {
					String name2 = iter2.next();
					if(!name2.equals(name)) {//相同元素不算關系
						context.write(new Text(name), new Text(name2));
					}
				}
			}
			
		}
	}

}

  

3.2 Main

package friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Deg2Main {

	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Configuration conf = new Configuration(); //對應於mapred-site.xml
		Job job = new Job(conf,"Deg2MR");
		job.setJarByClass(Deg2Main.class);
		job.setMapperClass(Deg2FriendMapper.class);
		job.setReducerClass(Deg2Reducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		job.setNumReduceTasks(1);
		//"/in"解析不了  提示文件不存在 因為把他們認為是本地文件了 因為有個 file:/
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.58.180:8020/MLTest/Deg2MR/Deg2MR.txt"));
		//輸出文件不能存在   
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.58.180:8020/MLTest/Deg2MR/Deg2Out"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

  

3.4 日志

m:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:323)
  INFO - Job job_local1127799899_0001 completed successfully
 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:765)
  INFO - Counters: 38
	File System Counters
		FILE: Number of bytes read=740
		FILE: Number of bytes written=509736
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=132
		HDFS: Number of bytes written=206
		HDFS: Number of read operations=13
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
	Map-Reduce Framework
		Map input records=13
		Map output records=26
		Map output bytes=106
		Map output materialized bytes=164
		Input split bytes=116
		Combine input records=0
		Combine output records=0
		Reduce input groups=10
		Reduce shuffle bytes=164
		Reduce input records=26
		Reduce output records=50
		Spilled Records=52
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=3
		CPU time spent (ms)=0
		Physical memory (bytes) snapshot=0
		Virtual memory (bytes) snapshot=0
		Total committed heap usage (bytes)=456130560
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=66
	File Output Format Counters 
		Bytes Written=206
 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:323)
 DEBUG - stopping client from cache: org.apache.hadoop.ipc.Client@37afeb11
 DEBUG - removing client from cache: org.apache.hadoop.ipc.Client@37afeb11
 DEBUG - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@37afeb11
 DEBUG - Stopping client
 DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: closed
 DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: stopped, remaining connections 0
 

  

3.5 輸出

B	H
H	B
A	C
C	A
B	D
B	F
B	I 
D	B
D	F
D	I 
F	B
F	D
F	I 
I 	B
I 	D
I 	F
C	E
C	F
E	C
E	F
F	C
F	E
D	F
F	D
C	D
C	E
C	G
D	C
D	E
D	G
E	C
E	D
E	G
G	C
G	D
G	E
F	H
F	I
H	F
H	I
I	F
I	H
A	G
A	I
G	A
G	I
I	A
I	G
G	H
H	G

  

四、思考

4.1 單向

  類似父子關系找爺孫關系,或者是關注關系或者follow關系,那么Mapper階段不相互存入就可。

4.2 你最受歡迎的二度人脈

  簡單描述:即你關注的人中有N個人同時都關注了 XXX 。

4.3 Set遍歷

  雙重iterator便利HashSet,第二重如何從第一寵Set的iterator哪里開始呢。這樣可以少算一倍,應該可以吧set轉為數數組吧。

  不過這樣也好,A是B的二度,那么B也是A的二度....

4.4 另外

  一開始reducer里寫錯了,set.add(toString.toString()),竟然沒報錯,沒有toString這個變量。然后日志是reducer階段沒有任何寫入。

五、參考文獻

  http://blog.csdn.net/yongh701/article/details/50630498

  http://blog.csdn.net/u013926113/article/details/51539306

  https://my.oschina.net/BreathL/blog/75112


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM