PageRank


另一篇介紹的很好的博客:http://blog.jobbole.com/71431/

一、什么是PageRank

PageRank 是對搜索引擎的搜索網頁進行排序的算法。

 

過去的排序算法是比如使用網頁名字,關鍵詞出現的次數,人工等方法,但是這種方法一方面搜索結果不准確,另一方面搜索結果容易被人為因素影響。

所以,PageRank應運而生。

PageRank算法計算每一個網頁的PageRank值,然后根據這個值的大小對網頁的重要性進行排序。它的思想是模擬一個悠閑的上網者,上網者首先隨機選擇一個網頁打開,然后在這個網頁上呆了幾分鍾后,跳轉到該網頁

所指向的鏈接,這樣無所事事、漫無目的地在網頁上跳來跳去,PageRank就是估計這個悠閑的上網者分布在各個網頁上的概率。

PageRank背后的兩個基本假設:

  • 數量假設:更重要的網頁更可能被更多的網頁鏈接到。
  • 質量假設:有更高的PageRank的網頁將會傳遞更高的權重。(類似大佬轉發的網頁就比我轉發的網頁更重要)

二、PageRank模型

 

互聯網中網頁和網頁之間的鏈接關系組成一個有向圖,其中網頁是節點,網頁之間的鏈接為有相邊。如上圖所示的模型就表示四個網頁,A指向B的箭頭表示A中存在指向B的鏈接。

可以使用轉移矩陣來表示這樣一個有向圖。上圖所對應的模型即可用轉移矩陣表示為:

我們看到,A網頁發出三個鏈接,那么到任何一個網頁的概率都均等,所以我們上邊的矩陣A這一列對用的BCD三個網頁概率都為1/3。

有了上面的轉移矩陣,我們從兩個角度來思考:

一方面,一個網頁X如果有許多網頁指向他,那么轉移矩陣的X行就會有許多的非零元素,轉移之后這個網頁就會獲得更大的PageRank,這剛好和我們上文提到的數量假設相對應。

另一方面,假設X網頁具有很高的重要性,那么X所轉移到的那些網頁將會由於X的高重要性而獲得相對更高的重要性,這剛好和我們上文提到的質量假設相對應。

那么,怎樣表示一個網頁的重要性呢?

我們使用PageRank Matrix來表示。上述有向圖對應一個4 * 1的PageRank矩陣。

為什么初始化為均分初始化呢?我們可以這樣思考,一個無聊的上網這開始想要來打開一個網頁,那么他打開這一堆網頁中的任何一個在最開始都是風概率的。(當然不考慮個人信息,那是個性化搜索的范疇)

好了,我們現在有了兩個矩陣,轉移矩陣和初始的PageRank矩陣PR0, 我們接下來就要模擬網頁間的跳轉。

如圖,一次跳轉之后,網頁A的PageRank就應該由可以跳到A的網頁和他們各自的PageRank相乘之后的和來表示。上圖的矩陣乘法就可以表示一次跳轉之后的各個網頁的PageRank值。

同理,PR2 = Transition Matrix * PR1。 PRn =  Transition Matrix * PR(n - 1)。可以證明,最終的PR是收斂的。(馬爾科夫過程,圖是強連通的)

三、Dead Ends 終止點問題

有些網頁不存在指向其他網頁的鏈接,那么多次迭代之后,導致所有網頁的PageRank都變為0.

上圖網頁c沒有指向外部的鏈接,最終將會導致各個網頁的PageRank都變為0.

四、Spider traps 陷阱問題

陷阱問題是有些網頁只存在指向自己的鏈接,那么多次迭代之后,這將導致這個網頁的PageRank為1,而其他網頁的PageRank為0.

 

五、解決終止點問題和陷阱問題

我們現在回到開始是所說的那個無聊的上網者,假如人類遇到這個問題的話,采取的解決辦法就是關閉當前網頁,重新打開一個網頁。所以為了避免上述兩個問題,我們呢對上邊的轉移公式做一個小小的修正。(有概率隨機打開一個網頁,這時候打開所有網頁的概率均等)

回到之前的例子:

六、Map-Reduce計算PageRank

6.1輸入數據

上邊分析的時候數據似乎是存儲在轉移矩陣中,但實際不是這樣的,這樣一方面太浪費空間了,另一方面,插入和刪除數據都很麻煩。

實際的數據輸入格式如下:

我們先對數據進行預處理,類似於鄰接表的存儲方式,存儲網頁和該網頁包含的鏈接所指向的網頁。

我們使用Relations.txt來生成轉移矩陣,然后在進行轉移矩陣和PageRank矩陣的相乘操作。

6.2矩陣相乘

接下來就設計到Map-Reduce處理矩陣的乘法操作,我們不能直接來進行兩個矩陣的乘法操作。因為如果這樣做的話,我們需要轉移矩陣的一行數據都來齊之后才可進行,這就需要我們 in-memory 存儲整個矩陣,很容易造成out of mempry 而且速度還慢。

所以我們的處理方法是用Traverse MAtrix的每一列和PR相成,最后把所有元素相加即可。這樣我們就不需要in-memery那么多東西。

其實對於一個網頁A,可以思考為,你A B C D分別可以為我貢獻多少,然后把ABCD的貢獻全都加起來,就是A的PageRank。矩陣直接相乘和我們的相乘相加的方法的其別就是,直接相乘是計算A的PageRank 的時候,把所以對A有貢獻的網頁都拿來,計算出貢獻加起來作為A的PageRank;相乘相加是首先計算出A可以為ABCD貢獻多少,然后B可以為ABCD貢獻多少,然后C..D..。最后再把這些值加起來,分別作為ABCD的PageRank。

我們的流程圖如下:

容易知道,我們使用兩個Map-Reduce來實現。

MR1 --- Mapper1 從Relations.txt中讀取網頁並計算跳轉到的網頁及概率

  

MR1 --- Mapper2 從PR.txt中讀取網頁並均分概率

MR1 --- Reducer 兩個Mapper過來的key就代表是慨率網頁key轉移,輸出key應該為轉移到的網頁,value為網頁key對這個網頁做了多少貢獻。

 

MR2 --- Mapper 很簡單,就是從前一個Map-Reduce生成的文件中讀取數據

MR2 --- Reducer Mapper按照網頁作key之后分類的sum,得到各個網頁的PageRank

 七、主要代碼

Driver

 1 public class Driver {
 2 
 3     public static void main(String[] args) throws Exception {
 4         UnitMultiplication multiplication = new UnitMultiplication();
 5         UnitSum sum = new UnitSum();
 6 
 7         //args0: dir of transition.txt
 8         //args1: dir of PageRank.txt
 9         //args2: dir of unitMultiplication result
10         //args3: times of convergence
11         String transitionMatrix = args[0];
12         String prMatrix = args[1];
13         String unitState = args[2];
14         int count = Integer.parseInt(args[3]);
15         for(int i=0;  i<count;  i++) {
16             String[] args1 = {transitionMatrix, prMatrix+i, unitState+i};
17             multiplication.main(args1);
18             String[] args2 = {unitState + i, prMatrix+(i+1)};
19             sum.main(args2);
20         }
21     }
22 }
View Code

UnitMultiplication

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class UnitMultiplication {

    public static class TransitionMapper extends Mapper<Object, Text, Text, Text> {

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString().trim();
            String[] fromTo = line.split("\t");

            if(fromTo.length == 1 || fromTo[1].trim().equals("")) {
                return;
            }
            String from = fromTo[0];
            String[] tos = fromTo[1].split(",");
            for (String to: tos) {
                context.write(new Text(from), new Text(to + "=" + (double)1/tos.length));
            }
        }
    }

    public static class PRMapper extends Mapper<Object, Text, Text, Text> {

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] pr = value.toString().trim().split("\t");
            context.write(new Text(pr[0]), new Text(pr[1]));
        }
    }

    public static class MultiplicationReducer extends Reducer<Text, Text, Text, Text> {


        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> transitionUnit = new ArrayList<String>();
            double prUnit = 0;
            for (Text value: values) {
                if(value.toString().contains("=")) {
                    transitionUnit.add(value.toString());
                }
                else {
                    prUnit = Double.parseDouble(value.toString());
                }
            }
            for (String unit: transitionUnit) {
                String outputKey = unit.split("=")[0];
                double relation = Double.parseDouble(unit.split("=")[1]);
                String outputValue = String.valueOf(relation * prUnit);
                context.write(new Text(outputKey), new Text(outputValue));
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(UnitMultiplication.class);

        ChainMapper.addMapper(job, TransitionMapper.class, Object.class, Text.class, Text.class, Text.class, conf);
        ChainMapper.addMapper(job, PRMapper.class, Object.class, Text.class, Text.class, Text.class, conf);

        job.setReducerClass(MultiplicationReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, TransitionMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PRMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        job.waitForCompletion(true);
    }

}
View Code

UnitSum

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.DoubleWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 import java.io.IOException;
12 import java.text.DecimalFormat;
13 
14 public class UnitSum {
15     public static class PassMapper extends Mapper<Object, Text, Text, DoubleWritable> {
16 
17         @Override
18         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
19            String[] pageSubrank = value.toString().split("\t");
20             double subRank = Double.parseDouble(pageSubrank[1]);
21             context.write(new Text(pageSubrank[0]), new DoubleWritable(subRank));
22         }
23     }
24 
25     public static class SumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
26 
27 
28         @Override
29         public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
30                 throws IOException, InterruptedException {
31 
32             double sum = 0;
33             for (DoubleWritable value: values) {
34                 sum += value.get();
35             }
36             DecimalFormat df = new DecimalFormat("#.0000");
37             sum = Double.valueOf(df.format(sum));
38             context.write(key, new DoubleWritable(sum));
39         }
40     }
41 
42     public static void main(String[] args) throws Exception {
43 
44         Configuration conf = new Configuration();
45         Job job = Job.getInstance(conf);
46         job.setJarByClass(UnitSum.class);
47         job.setMapperClass(PassMapper.class);
48         job.setReducerClass(SumReducer.class);
49         job.setOutputKeyClass(Text.class);
50         job.setOutputValueClass(DoubleWritable.class);
51         FileInputFormat.addInputPath(job, new Path(args[0]));
52         FileOutputFormat.setOutputPath(job, new Path(args[1]));
53         job.waitForCompletion(true);
54     }
55 }
View Code

 考慮到dead ends 和 spider traps的話,只是前邊的矩陣cell相乘的時候乘以beta,后邊再用一個mapper來加上beta倍的pagerank即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM