簡單的pageRank實現參考:http://wlh0706-163-com.iteye.com/blog/1397694
較為復雜的PR值計算以及在hadoop上的實現:http://deathspeeder.is-programmer.com/posts/31349.html
pageRank算法的基本思想是:網頁的熱門程度依賴指向它的網頁的熱門程度。
也許google當初的PageRank網頁排名有着很嚴密的數學邏輯推導,但在編程的時候實現這種數學推導困難很大,用的更多的是另外一個超級簡單的數學公式,同樣可以實現將網頁排名的目的。
PageRank原理分析
舉例來講:
假設每個網頁都有一個自己的默認PR值,相當於人為添加給它是一種屬性,用來標識網頁的等級或者重要性,從而依據此標識達到排名目的。假設有ID號是1的一個網頁,PR值是10,假如它產生了到ID=3,ID=6,ID=8 ,ID=9這4個網頁的鏈接。那么可以理解為ID=1的網頁向ID=3,6,8,9的4個網頁各貢獻了2.5的PR值。如果想求任意一個網頁假設其ID=3的PR值,需要得到所有的其他網頁對ID=3這個網頁的貢獻的總和,再按照函數“所求PR”=“總和”*0.85+0.15得到。經過循環多次上述過程求得的網頁PR值,可以作為網頁排名的標識。
MapReduce過程分析
1:准備數據
理論數據是通過網頁爬蟲得到了某個特定封閉系統的所有網頁的信息,為了測試程序,可以自己模擬生成自己定義特定格式的數據。下面是我用來測試的數據,存儲方式如圖
(注:對於自定義模擬數據,在PR初始值的選取時,所有的網頁是“平等”的,不會說自己寫的網頁和Google的熱門網頁有多少差別,但是按照某種法則經過一定運算后PR是不一樣的,比如很多其他的網頁可能會鏈接到google,它的PR自然會比你的高。所以初始值的選取按照這種邏輯來講符合現實些,即所有網頁默認PR值相等。但是即使初始值定的不一樣,整個系統的PR總和可能會變化,最后的每個網頁PR也會發生變化,但是這種量之間的變化,不會影響到網頁自身的通過比較大小方式上的邏輯排名。
2:Map過程
map接受的數據格式默認是<偏移量,文本行>這樣的<key,value>對,形如<0,1 5 2 3 4 5><20,2 10 3 5 8 9>.
目標 :將默認數據格式,轉換成自定義格式<key,value>對。
已知 :hadoop框架在Map階段的時候會自動實現sort過程,就是將相同的key的所有value保存到list,形如<key,list(1,1,1,2)>這種形式,例如上述對ID=2的網頁有ID=1,6,7,8.這4個網頁貢獻(1.25,1,5/3,5),那么如果你采用的key是網頁ID,那么hadoop框架會以此種形式<2,list(1.25,1,5/3,5)>輸出。
分析 :因為這個過程要進行多次,reduce的最終輸出結果需要保存成原文件那樣的格式,所以每個網頁ID和自己鏈接情況也要在map階段輸出給reduce。
操作 :所以對於上述第一行操作map函數后結果是<id=1,2><id=1,3><id=1,4>,<id=1,5>保存了id=1網頁的鏈接情況,同時還要輸出<id=2,1.25><id=3,1.25><id=4,1.25><id=5,1.25>,每個網頁得到的貢獻值。
代碼:
public static class MyMapper extends Mapper<Object, Text, IntWritable, Text> { //存儲網頁ID private IntWritable id; //存儲網頁PR值 private String pr; //存儲網頁向外鏈接總數目 private int count; //網頁向每個外部鏈接的平均貢獻值 private float average_pr; public void map(Object key, Text value, Context context) { StringTokenizer str = new StringTokenizer(value.toString()); if (str.hasMoreTokens()) { // 得到網頁ID id = new IntWritable(Integer.parseInt(str.nextToken())); } else { return; } // 得到網頁pr pr = str.nextToken(); // 得到向外鏈接數目 count = str.countTokens(); // 對每個外部鏈接平均貢獻值 average_pr = Float.parseFloat(pr) / count; // 得到網頁的向外鏈接ID並輸出 while (str.hasMoreTokens()) { try { String nextId = str.nextToken(); //將網頁向外鏈接的ID以“@+得到貢獻值”格式輸出 Text t = new Text("@" + average_pr); context.write(id, t); // 將網頁ID和PR值輸出 Text tt = new Text("&" + nextId); context.write(id, tt); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Reduce階段:
分析:這個階段操作就相對簡單很多,讀取map的輸出<key,value>,並解析出來。
具體操作:如果value中首字母是“@”表示是貢獻值,如果是“&”表示是鏈接情況。
public void reduce(IntWritable key, Iterable<Text> values, Context context) { // 定義一個存儲網頁鏈接ID的隊列 ArrayList<String> ids = new ArrayList<String>(); // 將所有的鏈接ID以String格式保存 String lianjie = " "; // 定義一個保存網頁PR值的變量 float pr = 0; //遍歷 for(Text id : values) { String idd = id.toString(); //判斷value是貢獻值還是向外部的鏈接 if (idd.substring(0, 1).equals("@")) { // 貢獻值 pr += Float.parseFloat(idd.substring(1)); } else if (idd.substring(0, 1).equals("&")) { // 鏈接id String iddd = idd.substring(1); System.out.println("idddd= " + iddd); ids.add(iddd); } } // 計算最終pr pr = pr * 0.85f + 0.15f; // 得到所有鏈接ID的String形式 for (int i = 0; i < ids.size(); i++) { lianjie += ids.get(i) + " "; } // 組合pr+lianjie成原文件的格式類型 String result = pr + lianjie; System.out.println("Reduce result=" + result); try { context.write(key, new Text(result)); System.out.println("reduce 執行完畢。。。。。"); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
main函數:
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String pathIn1 = "/usr/local/hadoop/tt/ww";//輸入路徑 String pathOut=“”;//輸出路徑 //迭代10次 for (int i = 0; i < 10; i++) { System.out.println("xunhuan cishu=" + i); Job job = new Job(conf, "MapReduce pagerank"); pathOut = pathIn1 + i; job.setJarByClass(Main2.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(pathIn1)); FileOutputFormat.setOutputPath(job, new Path(pathOut)); pathIn1 = pathOut; job.waitForCompletion(true); } }