互聯網上各個網頁之間的鏈接關系我們都可以看成是一個有向圖,一個網頁的重要性由鏈接到該網頁的其他網頁來投票,一個較多鏈入的頁面會有比較高等級,反之如果一個頁面沒有鏈入或鏈入較少等級則低,網頁的PR值越高,代表網頁越重要
假設一個有A、B、C、D四個網頁組成的集合,B、C、D三個頁面都鏈入到A,則A的PR值將是B、C、D三個頁面PR值的總和:
PR(A)=PR(B)+PR(C)+PR(D)
繼續上面的假設,B除了鏈接到A,還鏈接到C和D,C除了鏈接到A,還鏈接到B,而D只鏈接到A,所以在計算A的PR值時,B的PR值只能投出1/3的票,C的PR值只能投出1/2的票,而D只鏈接到A,所以能投出全票,所以A的PR值總和應為:
PR(A)=PR(B)/3+PR(C)/2+PR(D)
所以可以得出一個網頁的PR值計算公式應為:

其中,Bu是所有鏈接到網頁u的網頁集合,網頁v是屬於集合Bu的一個網頁,L(v)則是網頁v的對外鏈接數(即出度)

圖1-1
表1-2 根據圖1-1計算的PR值
| PA(A) |
P(B) |
PR(C) | PR(D) | |
| 初始值 | 0.25 | 0.25 | 0.25 | 0.25 |
| 一次迭代 | 0.125 | 0.333 | 0.083 | 0.458 |
| 二次迭代 | 0.1665 | 0.4997 | 0.0417 | 0.2912 |
| …… | …… | …… | …… | …… |
| n次迭代 | 0.1999 | 0.3999 | 0.0666 | 0.3333 |
表1-2,經過幾次迭代后,PR值逐漸收斂穩定
然而在實際的網絡環境中,超鏈接並沒有那么理想化,比如PageRank模型會遇到如下兩個問題:
1.排名泄露
如圖1-3所示,如果存在網頁沒有出度鏈接,如A節點所示,則會產生排名泄露問題,經過多次迭代后,所有網頁的PR只都趨向於0

圖1-3
| PR(A) |
PR(B) | PR(C) | PR(D) | |
| 初始值 | 0.25 |
0.25 | 0.25 | 0.25 |
| 一次迭代 | 0.125 |
0.125 | 0.25 | 0.25 |
| 二次迭代 | 0.125 |
0.125 | 0.125 | 0.25 |
| 三次迭代 | 0.125 |
0.125 | 0.125 | 0.125 |
| …… | …… |
…… | …… | …… |
| n次迭代 | 0 |
0 | 0 | 0 |
表1-4為圖1-3多次迭代后結果
2.排名下沉
如圖1-5所示,若網頁沒有入度鏈接,如節點A所示,經過多次迭代后,A的PR值會趨向於0

圖1-5
| PR(A) |
PR(B) | PR(C) | PR(D) | |
| 初始值 | 0.25 |
0.25 | 0.25 | 0.25 |
| 一次迭代 | 0 |
0.375 | 0.25 | 0.375 |
| 二次迭代 | 0 |
0.375 | 0.375 | 0.25 |
| 三次迭代 | 0 |
0.25 | 0.375 | 0.375 |
| …… |
…… |
…… | …… | …… |
| n次迭代 | 0 |
…… | …… | …… |
表1-5為圖1-4多次迭代后結果
我們假定上王者隨機從一個網頁開始瀏覽,並不斷點擊當前頁面的鏈接進行瀏覽,直到鏈接到一個沒有任何出度鏈接的網頁,或者上王者感到厭倦,隨機轉到另外的網頁開始新一輪的瀏覽,這種模型顯然更貼近用戶的習慣。為了處理那些沒有任何出度鏈接的頁面,引入阻尼系數d來表示用戶到達繆戈頁面后繼續向后瀏覽的概率,一般將其設為0.85,而1-d就是用戶停止點擊,隨機轉到另外的網頁開始新一輪瀏覽的概率,所以引入隨機瀏覽模型的PageRank公式如下:


圖1-6
代碼1-7為根據圖1-6建立數據集
root@lejian:/data# cat links A B,C,D B A,D C D D B
GraphBuilder步驟的主要目的是建立網頁之間的鏈接關系圖,以及為每一個網頁分配初始的PR值,由於我們的數據集已經建立好網頁之間的鏈接關系圖,所以在代碼1-8中只要為每個網頁分配相等的PR值即可
代碼1-8
package com.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GraphBuilder {
public static class GraphBuilderMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text url = new Text();
private Text linkUrl = new Text();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
String[] tuple = value.toString().split("\t");
url.set(tuple[0]);
if (tuple.length == 2) {
// 網頁有出度
linkUrl.set(tuple[1] + "\t1.0");
} else {
// 網頁無出度
linkUrl.set("\t1.0");
}
context.write(url, linkUrl);
};
}
protected static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJobName("Graph Builder");
job.setJarByClass(GraphBuilder.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(GraphBuilderMapper.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
PageRankIter步驟的主要目的是迭代計算PageRank數值,直到滿足運算結束條件,比如收斂或達到預定的迭代次數,這里采用預設迭代次數的方式,多次運行該步驟
代碼1-9
package com.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRankIter {
private static final double DAMPING = 0.85;
public static class PRIterMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text prKey = new Text();
private Text prValue = new Text();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
String[] tuple = value.toString().split("\t");
if (tuple.length <= 2) {
return;
}
String[] linkPages = tuple[1].split(",");
double pr = Double.parseDouble(tuple[2]);
for (String page : linkPages) {
if (page.isEmpty()) {
continue;
}
prKey.set(page);
prValue.set(tuple[0] + "\t" + pr / linkPages.length);
context.write(prKey, prValue);
}
prKey.set(tuple[0]);
prValue.set("|" + tuple[1]);
context.write(prKey, prValue);
};
}
public static class PRIterReducer extends Reducer<Text, Text, Text, Text> {
private Text prValue = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
String links = "";
double pageRank = 0;
for (Text val : values) {
String tmp = val.toString();
if (tmp.startsWith("|")) {
links = tmp.substring(tmp.indexOf("|") + 1);
continue;
}
String[] tuple = tmp.split("\t");
if (tuple.length > 1) {
pageRank += Double.parseDouble(tuple[1]);
}
}
pageRank = (1 - DAMPING) + DAMPING * pageRank;
prValue.set(links + "\t" + pageRank);
context.write(key, prValue);
};
}
protected static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJobName("PageRankIter");
job.setJarByClass(PageRankIter.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(PRIterMapper.class);
job.setReducerClass(PRIterReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
PageRankViewer步驟將迭代計算得到的最終排名結果按照PageRank值從大到小的順序進行輸出,並不需要reduce,輸出的鍵值對為<PageRank,URL>
代碼1-10
package com.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRankViewer {
public static class PageRankViewerMapper extends Mapper<LongWritable, Text, FloatWritable, Text> {
private Text outPage = new Text();
private FloatWritable outPr = new FloatWritable();
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
String[] line = value.toString().split("\t");
outPage.set(line[0]);
outPr.set(Float.parseFloat(line[2]));
context.write(outPr, outPage);
};
}
public static class DescFloatComparator extends FloatWritable.Comparator {
public float compare(WritableComparator a, WritableComparable<FloatWritable> b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
protected static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJobName("PageRankViewer");
job.setJarByClass(PageRankViewer.class);
job.setOutputKeyClass(FloatWritable.class);
job.setSortComparatorClass(DescFloatComparator.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(PageRankViewerMapper.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
代碼1-11
package com.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class PageRankDriver {
public static void main(String[] args) throws Exception {
if (args == null || args.length != 3) {
throw new RuntimeException("請輸入輸入路徑、輸出路徑和迭代次數");
}
int times = Integer.parseInt(args[2]);
if (times <= 0) {
throw new RuntimeException("迭代次數必須大於零");
}
String UUID = "/" + java.util.UUID.randomUUID().toString();
String[] forGB = { args[0], UUID + "/Data0" };
GraphBuilder.main(forGB);
String[] forItr = { "", "" };
for (int i = 0; i < times; i++) {
forItr[0] = UUID + "/Data" + i;
forItr[1] = UUID + "/Data" + (i + 1);
PageRankIter.main(forItr);
}
String[] forRV = { UUID + "/Data" + times, args[1] };
PageRankViewer.main(forRV);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path(UUID);
fs.deleteOnExit(path);
}
}
運行代碼1-11,結果如代碼1-12所示
代碼1-12
root@lejian:/data# hadoop jar pageRank.jar com.hadoop.mapreduce.PageRankDriver /data /output 10
…… root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r-- 1 root supergroup 0 2017-02-10 17:55 /output/_SUCCESS
-rw-r--r-- 1 root supergroup 50 2017-02-10 17:55 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
1.5149547 B
1.3249696 D
0.78404236 A
0.37603337 C
