另一篇介紹的很好的博客:http://blog.jobbole.com/71431/
一、什么是PageRank
PageRank 是對搜索引擎的搜索網頁進行排序的算法。
過去的排序算法是比如使用網頁名字,關鍵詞出現的次數,人工等方法,但是這種方法一方面搜索結果不准確,另一方面搜索結果容易被人為因素影響。
所以,PageRank應運而生。
PageRank算法計算每一個網頁的PageRank值,然后根據這個值的大小對網頁的重要性進行排序。它的思想是模擬一個悠閑的上網者,上網者首先隨機選擇一個網頁打開,然后在這個網頁上呆了幾分鍾后,跳轉到該網頁
所指向的鏈接,這樣無所事事、漫無目的地在網頁上跳來跳去,PageRank就是估計這個悠閑的上網者分布在各個網頁上的概率。
PageRank背后的兩個基本假設:
- 數量假設:更重要的網頁更可能被更多的網頁鏈接到。
- 質量假設:有更高的PageRank的網頁將會傳遞更高的權重。(類似大佬轉發的網頁就比我轉發的網頁更重要)
二、PageRank模型
互聯網中網頁和網頁之間的鏈接關系組成一個有向圖,其中網頁是節點,網頁之間的鏈接為有相邊。如上圖所示的模型就表示四個網頁,A指向B的箭頭表示A中存在指向B的鏈接。
可以使用轉移矩陣來表示這樣一個有向圖。上圖所對應的模型即可用轉移矩陣表示為:
我們看到,A網頁發出三個鏈接,那么到任何一個網頁的概率都均等,所以我們上邊的矩陣A這一列對用的BCD三個網頁概率都為1/3。
有了上面的轉移矩陣,我們從兩個角度來思考:
一方面,一個網頁X如果有許多網頁指向他,那么轉移矩陣的X行就會有許多的非零元素,轉移之后這個網頁就會獲得更大的PageRank,這剛好和我們上文提到的數量假設相對應。
另一方面,假設X網頁具有很高的重要性,那么X所轉移到的那些網頁將會由於X的高重要性而獲得相對更高的重要性,這剛好和我們上文提到的質量假設相對應。
那么,怎樣表示一個網頁的重要性呢?
我們使用PageRank Matrix來表示。上述有向圖對應一個4 * 1的PageRank矩陣。
為什么初始化為均分初始化呢?我們可以這樣思考,一個無聊的上網這開始想要來打開一個網頁,那么他打開這一堆網頁中的任何一個在最開始都是風概率的。(當然不考慮個人信息,那是個性化搜索的范疇)
好了,我們現在有了兩個矩陣,轉移矩陣和初始的PageRank矩陣PR0, 我們接下來就要模擬網頁間的跳轉。
如圖,一次跳轉之后,網頁A的PageRank就應該由可以跳到A的網頁和他們各自的PageRank相乘之后的和來表示。上圖的矩陣乘法就可以表示一次跳轉之后的各個網頁的PageRank值。
同理,PR2 = Transition Matrix * PR1。 PRn = Transition Matrix * PR(n - 1)。可以證明,最終的PR是收斂的。(馬爾科夫過程,圖是強連通的)
三、Dead Ends 終止點問題
有些網頁不存在指向其他網頁的鏈接,那么多次迭代之后,導致所有網頁的PageRank都變為0.
上圖網頁c沒有指向外部的鏈接,最終將會導致各個網頁的PageRank都變為0.
四、Spider traps 陷阱問題
陷阱問題是有些網頁只存在指向自己的鏈接,那么多次迭代之后,這將導致這個網頁的PageRank為1,而其他網頁的PageRank為0.
五、解決終止點問題和陷阱問題
我們現在回到開始是所說的那個無聊的上網者,假如人類遇到這個問題的話,采取的解決辦法就是關閉當前網頁,重新打開一個網頁。所以為了避免上述兩個問題,我們呢對上邊的轉移公式做一個小小的修正。(有概率隨機打開一個網頁,這時候打開所有網頁的概率均等)
回到之前的例子:
六、Map-Reduce計算PageRank
6.1輸入數據
上邊分析的時候數據似乎是存儲在轉移矩陣中,但實際不是這樣的,這樣一方面太浪費空間了,另一方面,插入和刪除數據都很麻煩。
實際的數據輸入格式如下:
我們先對數據進行預處理,類似於鄰接表的存儲方式,存儲網頁和該網頁包含的鏈接所指向的網頁。
我們使用Relations.txt來生成轉移矩陣,然后在進行轉移矩陣和PageRank矩陣的相乘操作。
6.2矩陣相乘
接下來就設計到Map-Reduce處理矩陣的乘法操作,我們不能直接來進行兩個矩陣的乘法操作。因為如果這樣做的話,我們需要轉移矩陣的一行數據都來齊之后才可進行,這就需要我們 in-memory 存儲整個矩陣,很容易造成out of mempry 而且速度還慢。
所以我們的處理方法是用Traverse MAtrix的每一列和PR相成,最后把所有元素相加即可。這樣我們就不需要in-memery那么多東西。
其實對於一個網頁A,可以思考為,你A B C D分別可以為我貢獻多少,然后把ABCD的貢獻全都加起來,就是A的PageRank。矩陣直接相乘和我們的相乘相加的方法的其別就是,直接相乘是計算A的PageRank 的時候,把所以對A有貢獻的網頁都拿來,計算出貢獻加起來作為A的PageRank;相乘相加是首先計算出A可以為ABCD貢獻多少,然后B可以為ABCD貢獻多少,然后C..D..。最后再把這些值加起來,分別作為ABCD的PageRank。
我們的流程圖如下:
容易知道,我們使用兩個Map-Reduce來實現。
MR1 --- Mapper1 從Relations.txt中讀取網頁並計算跳轉到的網頁及概率
MR1 --- Mapper2 從PR.txt中讀取網頁並均分概率
MR1 --- Reducer 兩個Mapper過來的key就代表是慨率網頁key轉移,輸出key應該為轉移到的網頁,value為網頁key對這個網頁做了多少貢獻。
MR2 --- Mapper 很簡單,就是從前一個Map-Reduce生成的文件中讀取數據
MR2 --- Reducer Mapper按照網頁作key之后分類的sum,得到各個網頁的PageRank
七、主要代碼
Driver

1 public class Driver { 2 3 public static void main(String[] args) throws Exception { 4 UnitMultiplication multiplication = new UnitMultiplication(); 5 UnitSum sum = new UnitSum(); 6 7 //args0: dir of transition.txt 8 //args1: dir of PageRank.txt 9 //args2: dir of unitMultiplication result 10 //args3: times of convergence 11 String transitionMatrix = args[0]; 12 String prMatrix = args[1]; 13 String unitState = args[2]; 14 int count = Integer.parseInt(args[3]); 15 for(int i=0; i<count; i++) { 16 String[] args1 = {transitionMatrix, prMatrix+i, unitState+i}; 17 multiplication.main(args1); 18 String[] args2 = {unitState + i, prMatrix+(i+1)}; 19 sum.main(args2); 20 } 21 } 22 }
UnitMultiplication

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class UnitMultiplication { public static class TransitionMapper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); String[] fromTo = line.split("\t"); if(fromTo.length == 1 || fromTo[1].trim().equals("")) { return; } String from = fromTo[0]; String[] tos = fromTo[1].split(","); for (String to: tos) { context.write(new Text(from), new Text(to + "=" + (double)1/tos.length)); } } } public static class PRMapper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] pr = value.toString().trim().split("\t"); context.write(new Text(pr[0]), new Text(pr[1])); } } public static class MultiplicationReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> transitionUnit = new ArrayList<String>(); double prUnit = 0; for (Text value: values) { if(value.toString().contains("=")) { transitionUnit.add(value.toString()); } else { prUnit = Double.parseDouble(value.toString()); } } for (String unit: transitionUnit) { String outputKey = unit.split("=")[0]; double relation = Double.parseDouble(unit.split("=")[1]); String outputValue = String.valueOf(relation * prUnit); context.write(new Text(outputKey), new Text(outputValue)); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(UnitMultiplication.class); ChainMapper.addMapper(job, TransitionMapper.class, Object.class, Text.class, Text.class, Text.class, conf); ChainMapper.addMapper(job, PRMapper.class, Object.class, Text.class, Text.class, Text.class, conf); job.setReducerClass(MultiplicationReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, TransitionMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PRMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); } }
UnitSum

1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.DoubleWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Job; 6 import org.apache.hadoop.mapreduce.Mapper; 7 import org.apache.hadoop.mapreduce.Reducer; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 import java.io.IOException; 12 import java.text.DecimalFormat; 13 14 public class UnitSum { 15 public static class PassMapper extends Mapper<Object, Text, Text, DoubleWritable> { 16 17 @Override 18 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 19 String[] pageSubrank = value.toString().split("\t"); 20 double subRank = Double.parseDouble(pageSubrank[1]); 21 context.write(new Text(pageSubrank[0]), new DoubleWritable(subRank)); 22 } 23 } 24 25 public static class SumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { 26 27 28 @Override 29 public void reduce(Text key, Iterable<DoubleWritable> values, Context context) 30 throws IOException, InterruptedException { 31 32 double sum = 0; 33 for (DoubleWritable value: values) { 34 sum += value.get(); 35 } 36 DecimalFormat df = new DecimalFormat("#.0000"); 37 sum = Double.valueOf(df.format(sum)); 38 context.write(key, new DoubleWritable(sum)); 39 } 40 } 41 42 public static void main(String[] args) throws Exception { 43 44 Configuration conf = new Configuration(); 45 Job job = Job.getInstance(conf); 46 job.setJarByClass(UnitSum.class); 47 job.setMapperClass(PassMapper.class); 48 job.setReducerClass(SumReducer.class); 49 job.setOutputKeyClass(Text.class); 50 job.setOutputValueClass(DoubleWritable.class); 51 FileInputFormat.addInputPath(job, new Path(args[0])); 52 FileOutputFormat.setOutputPath(job, new Path(args[1])); 53 job.waitForCompletion(true); 54 } 55 }
考慮到dead ends 和 spider traps的話,只是前邊的矩陣cell相乘的時候乘以beta,后邊再用一個mapper來加上beta倍的pagerank即可。