PageRank網頁排名算法


互聯網上各個網頁之間的鏈接關系我們都可以看成是一個有向圖,一個網頁的重要性由鏈接到該網頁的其他網頁來投票,一個較多鏈入的頁面會有比較高等級,反之如果一個頁面沒有鏈入或鏈入較少等級則低,網頁的PR值越高,代表網頁越重要

假設一個有A、B、C、D四個網頁組成的集合,B、C、D三個頁面都鏈入到A,則A的PR值將是B、C、D三個頁面PR值的總和:

PR(A)=PR(B)+PR(C)+PR(D)

繼續上面的假設,B除了鏈接到A,還鏈接到C和D,C除了鏈接到A,還鏈接到B,而D只鏈接到A,所以在計算A的PR值時,B的PR值只能投出1/3的票,C的PR值只能投出1/2的票,而D只鏈接到A,所以能投出全票,所以A的PR值總和應為:

PR(A)=PR(B)/3+PR(C)/2+PR(D)

所以可以得出一個網頁的PR值計算公式應為:

其中,Bu是所有鏈接到網頁u的網頁集合,網頁v是屬於集合Bu的一個網頁,L(v)則是網頁v的對外鏈接數(即出度)

 

圖1-1

 

表1-2 根據圖1-1計算的PR值

 

PA(A)

P(B)

PR(C) PR(D)
初始值 0.25  0.25 0.25

 0.25

一次迭代  0.125  0.333  0.083

0.458

二次迭代  0.1665  0.4997 0.0417

 0.2912

……  …… …… ……

……

n次迭代  0.1999 0.3999 0.0666

0.3333

 

 表1-2,經過幾次迭代后,PR值逐漸收斂穩定

然而在實際的網絡環境中,超鏈接並沒有那么理想化,比如PageRank模型會遇到如下兩個問題:

  1.排名泄露 

  如圖1-3所示,如果存在網頁沒有出度鏈接,如A節點所示,則會產生排名泄露問題,經過多次迭代后,所有網頁的PR只都趨向於0

圖1-3

 

 

PR(A)

PR(B) PR(C) PR(D)
初始值

0.25

0.25 0.25 0.25
一次迭代

0.125

0.125 0.25 0.25
二次迭代

0.125

0.125 0.125 0.25
三次迭代

0.125

0.125 0.125 0.125
……

……

…… …… ……
n次迭代

0

0 0 0

 

表1-4為圖1-3多次迭代后結果 

   2.排名下沉

   如圖1-5所示,若網頁沒有入度鏈接,如節點A所示,經過多次迭代后,A的PR值會趨向於0

  

圖1-5

 

 

PR(A)

PR(B) PR(C) PR(D)
初始值

0.25

0.25 0.25 0.25
一次迭代

0

0.375 0.25 0.375
二次迭代

0

0.375 0.375 0.25
三次迭代

0

0.25 0.375 0.375
……

……

…… …… ……
n次迭代

0

…… …… ……

表1-5為圖1-4多次迭代后結果 

 

 

我們假定上王者隨機從一個網頁開始瀏覽,並不斷點擊當前頁面的鏈接進行瀏覽,直到鏈接到一個沒有任何出度鏈接的網頁,或者上王者感到厭倦,隨機轉到另外的網頁開始新一輪的瀏覽,這種模型顯然更貼近用戶的習慣。為了處理那些沒有任何出度鏈接的頁面,引入阻尼系數d來表示用戶到達繆戈頁面后繼續向后瀏覽的概率,一般將其設為0.85,而1-d就是用戶停止點擊,隨機轉到另外的網頁開始新一輪瀏覽的概率,所以引入隨機瀏覽模型的PageRank公式如下:

 

圖1-6

代碼1-7為根據圖1-6建立數據集

root@lejian:/data# cat links 
A       B,C,D
B       A,D
C       D
D       B

 

GraphBuilder步驟的主要目的是建立網頁之間的鏈接關系圖,以及為每一個網頁分配初始的PR值,由於我們的數據集已經建立好網頁之間的鏈接關系圖,所以在代碼1-8中只要為每個網頁分配相等的PR值即可

代碼1-8

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GraphBuilder {

	public static class GraphBuilderMapper extends Mapper<LongWritable, Text, Text, Text> {

		private Text url = new Text();
		private Text linkUrl = new Text();

		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");
			url.set(tuple[0]);
			if (tuple.length == 2) {
				// 網頁有出度
				linkUrl.set(tuple[1] + "\t1.0");
			} else {
				// 網頁無出度
				linkUrl.set("\t1.0");
			}
			context.write(url, linkUrl);
		};
	}

	protected static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("Graph Builder");
		job.setJarByClass(GraphBuilder.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(GraphBuilderMapper.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}

}

 

PageRankIter步驟的主要目的是迭代計算PageRank數值,直到滿足運算結束條件,比如收斂或達到預定的迭代次數,這里采用預設迭代次數的方式,多次運行該步驟

代碼1-9

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PageRankIter {

	private static final double DAMPING = 0.85;

	public static class PRIterMapper extends Mapper<LongWritable, Text, Text, Text> {

		private Text prKey = new Text();
		private Text prValue = new Text();

		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");
			if (tuple.length <= 2) {
				return;
			}
			String[] linkPages = tuple[1].split(",");
			double pr = Double.parseDouble(tuple[2]);
			for (String page : linkPages) {
				if (page.isEmpty()) {
					continue;
				}
				prKey.set(page);
				prValue.set(tuple[0] + "\t" + pr / linkPages.length);
				context.write(prKey, prValue);
			}
			prKey.set(tuple[0]);
			prValue.set("|" + tuple[1]);
			context.write(prKey, prValue);
		};
	}

	public static class PRIterReducer extends Reducer<Text, Text, Text, Text> {

		private Text prValue = new Text();

		protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
			String links = "";
			double pageRank = 0;
			for (Text val : values) {
				String tmp = val.toString();
				if (tmp.startsWith("|")) {
					links = tmp.substring(tmp.indexOf("|") + 1);
					continue;
				}
				String[] tuple = tmp.split("\t");
				if (tuple.length > 1) {
					pageRank += Double.parseDouble(tuple[1]);
				}
			}
			pageRank = (1 - DAMPING) + DAMPING * pageRank;
			prValue.set(links + "\t" + pageRank);
			context.write(key, prValue);
		};
	}

	protected static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("PageRankIter");
		job.setJarByClass(PageRankIter.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(PRIterMapper.class);
		job.setReducerClass(PRIterReducer.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}

}

 

 

PageRankViewer步驟將迭代計算得到的最終排名結果按照PageRank值從大到小的順序進行輸出,並不需要reduce,輸出的鍵值對為<PageRank,URL>

代碼1-10

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PageRankViewer {

	public static class PageRankViewerMapper extends Mapper<LongWritable, Text, FloatWritable, Text> {

		private Text outPage = new Text();
		private FloatWritable outPr = new FloatWritable();

		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
			String[] line = value.toString().split("\t");
			outPage.set(line[0]);
			outPr.set(Float.parseFloat(line[2]));
			context.write(outPr, outPage);
		};

	}

	public static class DescFloatComparator extends FloatWritable.Comparator {

		public float compare(WritableComparator a, WritableComparable<FloatWritable> b) {
			return -super.compare(a, b);
		}

		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return -super.compare(b1, s1, l1, b2, s2, l2);
		}
	}

	protected static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("PageRankViewer");
		job.setJarByClass(PageRankViewer.class);
		job.setOutputKeyClass(FloatWritable.class);
		job.setSortComparatorClass(DescFloatComparator.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(PageRankViewerMapper.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}

}

 

 

代碼1-11

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PageRankDriver {

	public static void main(String[] args) throws Exception {
		if (args == null || args.length != 3) {
			throw new RuntimeException("請輸入輸入路徑、輸出路徑和迭代次數");
		}
		int times = Integer.parseInt(args[2]);
		if (times <= 0) {
			throw new RuntimeException("迭代次數必須大於零");
		}
		String UUID = "/" + java.util.UUID.randomUUID().toString();
		String[] forGB = { args[0], UUID + "/Data0" };
		GraphBuilder.main(forGB);
		String[] forItr = { "", "" };
		for (int i = 0; i < times; i++) {
			forItr[0] = UUID + "/Data" + i;
			forItr[1] = UUID + "/Data" + (i + 1);
			PageRankIter.main(forItr);
		}
		String[] forRV = { UUID + "/Data" + times, args[1] };
		PageRankViewer.main(forRV);
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		Path path = new Path(UUID);
		fs.deleteOnExit(path);
	}

}

 

運行代碼1-11,結果如代碼1-12所示

代碼1-12

root@lejian:/data# hadoop jar pageRank.jar com.hadoop.mapreduce.PageRankDriver /data /output 10
…… root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r--   1 root supergroup          0 2017-02-10 17:55 /output/_SUCCESS
-rw-r--r--   1 root supergroup         50 2017-02-10 17:55 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
1.5149547       B
1.3249696       D
0.78404236      A
0.37603337      C

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM