互聯網上各個網頁之間的鏈接關系我們都可以看成是一個有向圖,一個網頁的重要性由鏈接到該網頁的其他網頁來投票,一個較多鏈入的頁面會有比較高等級,反之如果一個頁面沒有鏈入或鏈入較少等級則低,網頁的PR值越高,代表網頁越重要
假設一個有A、B、C、D四個網頁組成的集合,B、C、D三個頁面都鏈入到A,則A的PR值將是B、C、D三個頁面PR值的總和:
PR(A)=PR(B)+PR(C)+PR(D)
繼續上面的假設,B除了鏈接到A,還鏈接到C和D,C除了鏈接到A,還鏈接到B,而D只鏈接到A,所以在計算A的PR值時,B的PR值只能投出1/3的票,C的PR值只能投出1/2的票,而D只鏈接到A,所以能投出全票,所以A的PR值總和應為:
PR(A)=PR(B)/3+PR(C)/2+PR(D)
所以可以得出一個網頁的PR值計算公式應為:
其中,Bu是所有鏈接到網頁u的網頁集合,網頁v是屬於集合Bu的一個網頁,L(v)則是網頁v的對外鏈接數(即出度)
圖1-1
表1-2 根據圖1-1計算的PR值
PA(A) |
P(B) |
PR(C) | PR(D) | |
初始值 | 0.25 | 0.25 | 0.25 | 0.25 |
一次迭代 | 0.125 | 0.333 | 0.083 | 0.458 |
二次迭代 | 0.1665 | 0.4997 | 0.0417 | 0.2912 |
…… | …… | …… | …… | …… |
n次迭代 | 0.1999 | 0.3999 | 0.0666 | 0.3333 |
表1-2,經過幾次迭代后,PR值逐漸收斂穩定
然而在實際的網絡環境中,超鏈接並沒有那么理想化,比如PageRank模型會遇到如下兩個問題:
1.排名泄露
如圖1-3所示,如果存在網頁沒有出度鏈接,如A節點所示,則會產生排名泄露問題,經過多次迭代后,所有網頁的PR只都趨向於0
圖1-3
PR(A) |
PR(B) | PR(C) | PR(D) | |
初始值 | 0.25 |
0.25 | 0.25 | 0.25 |
一次迭代 | 0.125 |
0.125 | 0.25 | 0.25 |
二次迭代 | 0.125 |
0.125 | 0.125 | 0.25 |
三次迭代 | 0.125 |
0.125 | 0.125 | 0.125 |
…… | …… |
…… | …… | …… |
n次迭代 | 0 |
0 | 0 | 0 |
表1-4為圖1-3多次迭代后結果
2.排名下沉
如圖1-5所示,若網頁沒有入度鏈接,如節點A所示,經過多次迭代后,A的PR值會趨向於0
圖1-5
PR(A) |
PR(B) | PR(C) | PR(D) | |
初始值 | 0.25 |
0.25 | 0.25 | 0.25 |
一次迭代 | 0 |
0.375 | 0.25 | 0.375 |
二次迭代 | 0 |
0.375 | 0.375 | 0.25 |
三次迭代 | 0 |
0.25 | 0.375 | 0.375 |
…… |
…… |
…… | …… | …… |
n次迭代 | 0 |
…… | …… | …… |
表1-5為圖1-4多次迭代后結果
我們假定上王者隨機從一個網頁開始瀏覽,並不斷點擊當前頁面的鏈接進行瀏覽,直到鏈接到一個沒有任何出度鏈接的網頁,或者上王者感到厭倦,隨機轉到另外的網頁開始新一輪的瀏覽,這種模型顯然更貼近用戶的習慣。為了處理那些沒有任何出度鏈接的頁面,引入阻尼系數d來表示用戶到達繆戈頁面后繼續向后瀏覽的概率,一般將其設為0.85,而1-d就是用戶停止點擊,隨機轉到另外的網頁開始新一輪瀏覽的概率,所以引入隨機瀏覽模型的PageRank公式如下:
圖1-6
代碼1-7為根據圖1-6建立數據集
root@lejian:/data# cat links A B,C,D B A,D C D D B
GraphBuilder步驟的主要目的是建立網頁之間的鏈接關系圖,以及為每一個網頁分配初始的PR值,由於我們的數據集已經建立好網頁之間的鏈接關系圖,所以在代碼1-8中只要為每個網頁分配相等的PR值即可
代碼1-8
package com.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class GraphBuilder { public static class GraphBuilderMapper extends Mapper<LongWritable, Text, Text, Text> { private Text url = new Text(); private Text linkUrl = new Text(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String[] tuple = value.toString().split("\t"); url.set(tuple[0]); if (tuple.length == 2) { // 網頁有出度 linkUrl.set(tuple[1] + "\t1.0"); } else { // 網頁無出度 linkUrl.set("\t1.0"); } context.write(url, linkUrl); }; } protected static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("Graph Builder"); job.setJarByClass(GraphBuilder.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(GraphBuilderMapper.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
PageRankIter步驟的主要目的是迭代計算PageRank數值,直到滿足運算結束條件,比如收斂或達到預定的迭代次數,這里采用預設迭代次數的方式,多次運行該步驟
代碼1-9
package com.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankIter { private static final double DAMPING = 0.85; public static class PRIterMapper extends Mapper<LongWritable, Text, Text, Text> { private Text prKey = new Text(); private Text prValue = new Text(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String[] tuple = value.toString().split("\t"); if (tuple.length <= 2) { return; } String[] linkPages = tuple[1].split(","); double pr = Double.parseDouble(tuple[2]); for (String page : linkPages) { if (page.isEmpty()) { continue; } prKey.set(page); prValue.set(tuple[0] + "\t" + pr / linkPages.length); context.write(prKey, prValue); } prKey.set(tuple[0]); prValue.set("|" + tuple[1]); context.write(prKey, prValue); }; } public static class PRIterReducer extends Reducer<Text, Text, Text, Text> { private Text prValue = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException { String links = ""; double pageRank = 0; for (Text val : values) { String tmp = val.toString(); if (tmp.startsWith("|")) { links = tmp.substring(tmp.indexOf("|") + 1); continue; } String[] tuple = tmp.split("\t"); if (tuple.length > 1) { pageRank += Double.parseDouble(tuple[1]); } } pageRank = (1 - DAMPING) + DAMPING * pageRank; prValue.set(links + "\t" + pageRank); context.write(key, prValue); }; } protected static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("PageRankIter"); job.setJarByClass(PageRankIter.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(PRIterMapper.class); job.setReducerClass(PRIterReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
PageRankViewer步驟將迭代計算得到的最終排名結果按照PageRank值從大到小的順序進行輸出,並不需要reduce,輸出的鍵值對為<PageRank,URL>
代碼1-10
package com.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankViewer { public static class PageRankViewerMapper extends Mapper<LongWritable, Text, FloatWritable, Text> { private Text outPage = new Text(); private FloatWritable outPr = new FloatWritable(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String[] line = value.toString().split("\t"); outPage.set(line[0]); outPr.set(Float.parseFloat(line[2])); context.write(outPr, outPage); }; } public static class DescFloatComparator extends FloatWritable.Comparator { public float compare(WritableComparator a, WritableComparable<FloatWritable> b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } protected static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("PageRankViewer"); job.setJarByClass(PageRankViewer.class); job.setOutputKeyClass(FloatWritable.class); job.setSortComparatorClass(DescFloatComparator.class); job.setOutputValueClass(Text.class); job.setMapperClass(PageRankViewerMapper.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
代碼1-11
package com.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class PageRankDriver { public static void main(String[] args) throws Exception { if (args == null || args.length != 3) { throw new RuntimeException("請輸入輸入路徑、輸出路徑和迭代次數"); } int times = Integer.parseInt(args[2]); if (times <= 0) { throw new RuntimeException("迭代次數必須大於零"); } String UUID = "/" + java.util.UUID.randomUUID().toString(); String[] forGB = { args[0], UUID + "/Data0" }; GraphBuilder.main(forGB); String[] forItr = { "", "" }; for (int i = 0; i < times; i++) { forItr[0] = UUID + "/Data" + i; forItr[1] = UUID + "/Data" + (i + 1); PageRankIter.main(forItr); } String[] forRV = { UUID + "/Data" + times, args[1] }; PageRankViewer.main(forRV); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(UUID); fs.deleteOnExit(path); } }
運行代碼1-11,結果如代碼1-12所示
代碼1-12
root@lejian:/data# hadoop jar pageRank.jar com.hadoop.mapreduce.PageRankDriver /data /output 10
…… root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r-- 1 root supergroup 0 2017-02-10 17:55 /output/_SUCCESS
-rw-r--r-- 1 root supergroup 50 2017-02-10 17:55 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
1.5149547 B
1.3249696 D
0.78404236 A
0.37603337 C