關於PageRank的地位,不必多說。
主要思想:對於每個網頁,用戶都有可能點擊網頁上的某個鏈接,例如
由這個我們可以得到網頁的轉移矩陣
A B C D
A 0 1/2 1 0
B 1/3 0 0 0
C 1/3 1/2 0 0
D 1/3 0 0 1/2
Aij表示網頁j到網頁i的轉移概率。假設起始狀態每個用戶對ABCD四個網站的點擊概率相同都是0.25,那么各個網站第一次被訪問的概率為(0.25,0.25,0.25,0.25),第二次訪問考慮到在頁面跳轉,利用轉移矩陣對於網站A的概率為(0,1/2,1,0)*(0.25,0.25,0.25,0.25)T,一次類推,經過若干次迭代會收斂到某個值。但是考慮到有些鏈接是單鏈即沒有別的鏈接只想他,他也不指向別的鏈接,以及有些鏈接是自己指向自己,那么上述的方式將無法收斂。所以后面加了一個阻尼系數一般取0.85,至於為什么是這樣,挺復雜的證明。
最后的公式為alaph=factor*matrix*(alaph)T+(1-facotr)/n*
詳細的介紹可以參考:
http://blog.jobbole.com/71431/
接下來便是對比Hadoop和spark了。這里只是單純的討論兩個環境下編程的效率,不討論性能。
Hadoop:
輸入的文件:
這里得先說一句,之所以加了0.25是因為初始的概率為1/n,而n為網站數,這里統計網站數又得需要一個MapReduce來實現,所以作罷,權當n是手工輸入的。
由於每次迭代后的結果只能放在文件中,所以這里花了很多時間在規范如何輸出,以及map和reduce之間如何傳值的問題。
在map中,我們要做的是從輸入文件中獲取alaph和每個網站的轉移概率。例如
A 0.25:B,C,D
B的轉移概率為1/3而且是從A轉向B的,所以輸出的是<"B","link:A 0.333">link表示這是個轉移概率,A表示是從A出發的
alaph的表示:<"B","alaph: A 0.25">這里的A表示這個alaph值對應這A。
由於我們這里迭代后的輸入文件都是從輸出文件中獲取,所以我們需要將輸出文件搞的和一開始輸入文件一樣,所以在map階段需要輸出<"A","content:B,C,D">方便reduce輸出和輸入文件一樣格式的輸出。
在reduce階段,此時對於鍵值B而言,會收到如下
<"B","link:A 0.333">
<"B","link:D 0.5">
<"B","alaph: A 0.25">
<"B","alaph: D 0.25">
<"B","content:A,D">
我們根據不同的單詞,將value整合。這的alaph=0.333*0.25+0.5*0.25,接着再加上阻尼系數等,得到最后的alaph值。然后利用content對應的value,最后輸出<"B:0.375","A,D">
這樣迭代若干次。
附上代碼:
1 package org.apache.hadoop.PageRank; 2 3 import java.util.ArrayList; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 public class PageRank { 14 15 public static void run(){ 16 17 } 18 19 public static void main(String[] args) throws Exception { 20 double factor=0; 21 if(args.length>1){ 22 factor=Double.parseDouble(args[0]); 23 }else{ 24 factor=0.85; 25 } 26 String input="hdfs://10.107.8.110:9000/PageRank_input"; 27 String output="hdfs://10.107.8.110:9000/PageRank/output"; 28 ArrayList<String> pathList=new ArrayList<String>(); 29 for(int i=0;i<20;i++){ 30 Configuration conf = new Configuration(); 31 conf.set("num","4"); 32 conf.set("factor",String.valueOf(factor)); 33 Job job = Job.getInstance(conf, "PageRank"); 34 job.setJarByClass(org.apache.hadoop.PageRank.PageRank.class); 35 job.setMapperClass(MyMapper.class); 36 job.setReducerClass(MyReducer.class); 37 job.setOutputKeyClass(Text.class); 38 job.setOutputValueClass(Text.class); 39 FileInputFormat.setInputPaths(job, new Path(input)); 40 FileOutputFormat.setOutputPath(job, new Path(output)); 41 input=output; 42 pathList.add(output); 43 output=output+1; 44 45 System.out.println("the "+i+"th iterator is finished"); 46 job.waitForCompletion(true); 47 } 48 for(int i=0;i<pathList.size()-1;i++){ 49 Configuration conf=new Configuration(); 50 Path path=new Path(pathList.get(i)); 51 FileSystem fs=path.getFileSystem(conf); 52 fs.delete(path,true); 53 } 54 } 55 56 } 57 58 59 60 package org.apache.hadoop.PageRank; 61 62 import java.io.IOException; 63 import java.util.HashMap; 64 import java.util.Map; 65 66 67 import org.apache.hadoop.io.LongWritable; 68 import org.apache.hadoop.io.Text; 69 import org.apache.hadoop.mapreduce.Mapper; 70 71 public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { 72 73 74 public void map(LongWritable ikey, Text ivalue, Context context) 75 throws IOException, InterruptedException { 76 String[] line=ivalue.toString().split(":"); 77 String content=line[1]; 78 int num=content.split(",").length; 79 String word=line[0].split(" ")[0]; 80 String alaph=line[0].split(" ")[1]; 81 context.write(new Text(word),new Text("content:"+content)); 82 for(String w:content.split(",")){ 83 context.write(new Text(w),new Text("link:"+word+" "+String.valueOf(1.0/num))); 84 context.write(new Text(w),new Text("alaph:"+word+" "+alaph)); 85 } 86 } 87 88 } 89 90 91 92 package org.apache.hadoop.PageRank; 93 94 import java.io.IOException; 95 import java.util.HashMap; 96 import java.util.Map; 97 98 import org.apache.hadoop.conf.Configuration; 99 import org.apache.hadoop.io.Text; 100 import org.apache.hadoop.mapreduce.Reducer; 101 102 public class MyReducer extends Reducer<Text, Text, Text, Text> { 103 104 public void reduce(Text _key, Iterable<Text> values, Context context) 105 throws IOException, InterruptedException { 106 // process values 107 Configuration conf=context.getConfiguration(); 108 double factor=Double.parseDouble(conf.get("factor")); 109 int num=Integer.parseInt(conf.get("num")); 110 111 Map<String,Double> alaph=new HashMap<String,Double>(); 112 Map<String,Double> link=new HashMap<String,Double>(); 113 114 String content=""; 115 for (Text val : values) { 116 String[] line=val.toString().split(":"); 117 if(line[0].compareTo("content")==0){ 118 content=line[1]; 119 }else { 120 String[] s=line[1].split(" "); 121 double d=Double.parseDouble(s[1]); 122 if(line[0].compareTo("alaph")==0){ 123 alaph.put(s[0],d); 124 }else if(line[0].compareTo("link")==0){ 125 link.put(s[0],d); 126 } 127 } 128 } 129 double sum=0; 130 for(Map.Entry<String,Double> entry:alaph.entrySet()){ 131 sum+=link.get(entry.getKey())*entry.getValue(); 132 } 133 134 System.out.println(" "); 135 System.out.println("sum is "+sum); 136 System.out.println(" "); 137 double result=factor*sum+(1-factor)/num; 138 context.write(_key,new Text(String.valueOf(result)+":"+content)); 139 140 } 141 142 }
我們可以看出,其實在MapReduce中我們將大把的精力花在了map的輸出上,而之所以這樣是因為我們不能直接利用他的結果,並且為了能迭代,我們又只能格式化輸出,如果數據很多的,那么在map階段將有很多的資源需要傳遞。總而言之,Hadoop讓我們將大部分精力花在不該花的地方。
接下來看spark 。我這里用的是python,在pyspark下運行。輸入文件:
先看代碼
def f(x): links=x[1][0] rank=x[1][1] n=len(links.split(",")) result=[] for s in links.split(","): result.append((s,rank*1.0/n)) return result file="hdfs://10.107.8.110:9000/spark_test/pagerank.txt" data=sc.textFile(file) link=data.map(lambda x:(x.split(":")[0],x.split(":")[1])) n=data.count() rank=link.mapValues(lambda x:1.0/n) for i in range(10): rank=link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y).mapValues(lambda x:0.15/n+0.85*x)
直接分析,data=sc.textFile(file)從hdfs中獲取text文件。
通過data.collect()可以發現內容為


我們需要將其轉換為鍵值對,那么這里就需要map函數
link=data.map(lambda
x:(x.split(":")[0],x.split(":")[1]))用於將文件轉換為鍵值對
此時lambda x的x值為字符串,所以通過:將其分割


接着通過n=data.count()我們可以直接獲得網站數,而不必手動輸入
rank=link.mapValues(lambda
x:1.0/n)用於初始化各個網站的訪問概率


接着通過link.join(rank),讓link和rank根據key而join進來

link.join(rank).flatMap(f)用於提取鍵值,由於輸入的是(page,(links,rank)),所以這里定義了一個函數f用於分割links,讓links分割成若干個link,並加上rank輸出。

最后只需將其按照key值進行reduce即可
link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y),這樣就會將相同key的概率相加,得到alaph,接着再加上阻尼系數即可

link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y).mapValues(lambda
x:0.15/n+0.85*x)這樣就是一個完整的計算
通過迭代若干次就可以了。
從代碼量上說(雖然python比java簡明)spark的確是比Hadoop好很多。原因也說了,1每次迭代不必將結果存放在文件中 2提供了更多的范式