數據挖掘之權重計算(PageRank)


劉  勇  Email:lyssym@sina.com

簡介

  鑒於在Web抓取服務和文本挖掘之句子向量中對權重值的計算需要,本文基於MapReduce計算模型實現了PageRank算法。為驗證本文算法的有效性,本文采用177萬余條源URL到目標URL鏈接的數據集,並迭代101次來展開測試,測試結果表明:對上述數據集進行測試,總計耗時40.29分鍾。因此,在權重評定的算法設計與實現中引入該思想,具有較好的現實意義。

引言

  在Web抓取服務中,由於采用多個定向爬蟲對網頁進行抓取,因此其面臨2個重要問題,1)爬蟲的調度問題,不同的爬蟲的抓取頻率決定了獲取該站點的信息數量;2)爬蟲的深度問題,在某個站點內抓取越深,其獲取的信息越陳舊,而爬蟲設計者的本意是及時回頭,即到達一定深度后,返回站點首頁或者回退至上一步。因此,基於上述現實問題,本文作者擬將Google的PageRank算法的排名思想引入至該應用中,通過其PageRank值來確定各站點的抓取頻率,但是采用這種設計的結果,則是設計一個全網爬蟲。

  在文本挖掘研究中,對句子權重的研究中,擬引入PageRank思想來計算句子向量的權重中,主要是基於句子相互間存在語義關聯。

  鑒於上述原因,結合Web數據規模日益擴展的需要,采用MapReduce計算模型實現PageRank算法。

 PageRank設計

  1) 有向圖

  互聯網中的網頁可以視為有向圖,每一個網頁可以視為一個節點,若網頁A中有一個鏈接到B,則存在一條有向邊AàB。如圖1所示,為一個簡單的網絡鏈接有向圖。

 

圖1 網絡鏈接有向圖

  根據圖1對PageRank排名思想解釋如下:對於A節點,其存在3條鏈出(A-->B,A-->C,A-->D),因此B、C、D分別獲得A節點PageRank(PR)值的1/3;但是,節點A又存在2條鏈入(B--> A,C-->A),即節點B的PR的1/2和節點C的PR對A有貢獻。其它節點,與之類同,在此不做贅述。

  根據上述分析,很明顯PageRank算法需要做多次迭代,以期使各節點的PR值趨於穩定(最終達到收斂)。本文作者設計時,擬采用100為最大迭代次數。

  2) 終止與陷阱

  若將Web抓取服務(網絡爬蟲)視為馬爾科夫鏈過程,則上述收斂問題,必須滿足有向圖是強關聯的。但是現實網絡世界里,強關聯有向圖不太現實,即有些站點(網址)中的信息中根本沒有鏈接或者鏈接已失效,即面臨節點終止問題,如圖2所示。

 

圖2網絡鏈接有向圖

  在圖2中,節點C沒有鏈出,即Web抓取服務在C節點面臨終止,其造成PR值在多次迭代之后收斂於0。按照網絡爬蟲設計的本意,當面臨節點終止時,應該跳轉至其它的節點,最常見的做法為跳轉至站點首頁或者返回上一級。

  此外,有些站點(網址)為了方便向用戶提供服務,會在網頁醒目位置多次嵌入本網頁的鏈接,如門戶網站。但是,若某一個站點(網址)只嵌入本網頁的鏈接,則必然造成網絡爬蟲掉入陷阱中,進入死循環,即面臨節點陷阱問題,如圖3所示。

 

圖3 網絡鏈接有向圖

  在圖3中,節點C只存在自身的鏈出,即Web抓取服務在C節點陷入死循環,其造成PR值在多次迭代之后,C節點的PR值為1,其它節點的PR值均為0。按照網絡爬蟲設計的本意,當面節點陷阱時,應該跳轉至其它的節點,最常見的做法為跳轉至站點首頁或者返回上一級。

       因此,本文作者綜合上述2種特殊情形,引入以下策略,每個節點,都有權力選擇進入下一個節點,或者回退至上一個節點。具體數學表達式如公式1所示:

          (公式1)

       如公式1所示,α為影響因子, 和分別為當前迭代中某節點的PR值和上一次迭代中某節點的PR。鑒於每個節點均有權利回退至上一個節點,由於表征上一次迭代中所有鏈入之和,故此引入該參數來表征回退至上一次的可能性。

  3) 算法設計

  本文在設計中,采用兩級MapReduce計算模型來實現,第一級MapReduce生成網絡鏈接的有向圖;第二級MapReduce用於迭代PR,以確定PR是否收斂。需要指出,本文在設計過程中,Reducer均設定為5個。

  針對第一級MapReduce,對其Mapper和Reducer簡要描述如下:

  Mapper:完成源URL和目標URL的標識,如A-->B;

  Reducer: 根據源URL(有向圖),實現某一節點至其所有鏈出的標識,如A-->B-->C-->D,需要指出,上述標識表示,A-->B,A-->C,A-->D。采用該設計主要是為了節省節點存儲空間。

  針對第二級MapReduce,對其Mapper和Reducer簡要描述如下:

  Mapper: 完成鏈出節點及其PR獲取,如B節點存在B-->A,B-->D,則A和D均分B的PR值;

  Reducer:針對Mapper中每個節點及其PR,對該節點PR值求和,並采用公式1進行量化。

  其中第二級MapReduce采用多次迭代,若迭代過程中,節點的PR已收斂,則退出迭代。

  4)  測試結果

  本文測試環境描述如下,采用10台物理機組成Hadoop集群,CPU:Intel(R) Core(TM) i5-4440 CPU @ 3.10GHz,內存:4G,Hadoop:2.7.1,以上描述為集群的大概配置,其中某個節點的配置可能不一致,本文作者也並未對每個節點進行詳細確認。本文測試數據集采用177萬余條鏈接,迭代101次(迭代次數當時控制失誤,本意為100次,作者比較懶就沒有重新再次迭代了),總計耗時為40.29分鍾。從整體而言,其處理速率在1小時內,還是能夠接受的。

總結

  本文對基於MapReduce的PageRank算法進行研究與實現,經過實際數據集進行測試,測試結果表明,該測試結果處理速率還是能夠接受的。但是,本文作者的意圖並不是為了實現該算法,而是將該算法的設計思想引入后續Web抓取服務的優化與改進之中,以及后續文本挖掘中對權重值計算的需要之中。

  程序源代碼:

 1 import java.io.IOException;
 2 import org.apache.hadoop.io.NullWritable;
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Mapper;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6 import org.apache.logging.log4j.LogManager;
 7 import org.apache.logging.log4j.Logger;
 8 
 9 public class GraphSet {
10     
11     public static class GraphMapper extends Mapper<Object, Text, Text, Text> {
12         public static Logger logger = LogManager.getLogger(GraphMapper.class);
13         
14         public void map(Object key, Text value, Context context) 
15         {
16             String[] link = value.toString().split("\t");
17             try {
18                 context.write(new Text(link[0]), new Text(link[1]));
19             } catch (IOException e) {
20                 e.printStackTrace();
21             } catch (InterruptedException e) {
22                 e.printStackTrace();
23             }
24         }
25     }
26     
27     
28     public static class GraphReducer extends Reducer<Text, Text, NullWritable, Text> {
29         public static Logger logger = LogManager.getLogger(GraphReducer.class);
30         public void reduce(Text key, Iterable<Text> values, Context context) 
31         {
32             StringBuilder sb = new StringBuilder();
33             sb.append(key.toString());
34             for (Text e : values) {
35                 sb.append("\t");
36                 sb.append(e.toString());
37             }
38             
39             try {
40                 context.write(NullWritable.get(), new Text(sb.toString()));
41             } catch (IOException e1) {
42                 e1.printStackTrace();
43             } catch (InterruptedException e1) {
44                 e1.printStackTrace();
45             }
46         }
47     }
48     
49 }
Class GraphSet
 1 import java.io.IOException;
 2 import java.util.Map;
 3 import java.util.HashMap;
 4 
 5 import org.apache.hadoop.io.NullWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.io.DoubleWritable;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.logging.log4j.LogManager;
11 import org.apache.logging.log4j.Logger;
12 
13 public class PageRank {
14     public static double alpha = 0.8;
15     
16     public static class PageRankMapper extends Mapper<Object, Text, Text, DoubleWritable> {
17         public static Map<String, Double> mapPR = new HashMap<String, Double>();
18         public static Logger logger = LogManager.getLogger(PageRankMapper.class);
19                 
20         public void map(Object key, Text value, Context context) {
21             String[] str = value.toString().split("\t");                 
22             int size = str.length;
23             double linkOut = mapPR.get(str[0]);
24             try {
25                 for (int i = 0; i < size-1; i++) {
26                     DoubleWritable dw = new DoubleWritable();
27                     dw.set(linkOut / (size-1)); // count the output of links
28                     context.write(new Text(str[i+1]), dw);
29                 }
30             } catch (Exception e) {
31                 e.printStackTrace();
32             }
33         }
34     }
35     
36     
37     public static class PageRankReducer extends Reducer<Text, DoubleWritable, NullWritable, Text> {
38         public static Logger logger = LogManager.getLogger(PageRankReducer.class);
39         private int numberOfUrl = 0;
40         
41         public void setup(Context context) {
42             numberOfUrl = context.getConfiguration().getInt("numberOfUrl", Integer.MAX_VALUE);
43         }
44         
45         
46         public void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
47             double factor = 0;
48             double sum = 0;
49             for (DoubleWritable d : values)
50                 sum += d.get();
51 
52             if (PageRankMapper.mapPR.containsKey(key.toString()))
53                 factor = PageRankMapper.mapPR.get((key.toString()));
54             else
55                 factor = (double)1/(2*numberOfUrl);
56             
57             sum = alpha*sum + (1-alpha)*factor;  
58             String ret = key.toString() + "\t" + String.valueOf(sum);
59             try {
60                 context.write(NullWritable.get(), new Text(ret));
61             } catch (IOException e) {
62                 e.printStackTrace();
63             } catch (InterruptedException e) {
64                 e.printStackTrace();
65             }
66         }
67     }
68 
69 }
Class PageRank
  1 import java.io.BufferedReader;
  2 import java.io.IOException;
  3 import java.io.InputStreamReader;
  4 import java.net.URI;
  5 import java.util.Map;
  6 import java.util.HashMap;
  7 import java.util.Set;
  8 import java.util.TreeSet;
  9 
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.fs.FSDataInputStream;
 12 import org.apache.hadoop.fs.FileSystem;
 13 import org.apache.hadoop.fs.Path;
 14 import org.apache.hadoop.io.DoubleWritable;
 15 import org.apache.hadoop.io.NullWritable;
 16 import org.apache.hadoop.io.Text;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 20 import org.apache.logging.log4j.LogManager;
 21 import org.apache.logging.log4j.Logger;
 22 
 23 import com.gta.graph.GraphSet;
 24 import com.gta.graph.GraphSet.GraphMapper;
 25 import com.gta.graph.GraphSet.GraphReducer;
 26 import com.gta.pagerank.PageRank.PageRankMapper;
 27 import com.gta.pagerank.PageRank.PageRankReducer;
 28 
 29 public class PR {
 30     public static final int MAX = 100;
 31     public static final int TASK = 5;
 32     public static final double THRESHOLD = 1e-10; 
 33     public static final String INPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/input/";
 34     public static final String OUTPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/output/";
 35     public static final String TMP_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/tmp/";
 36     public static Logger logger = LogManager.getLogger(PR.class);
 37     private Configuration conf = null;
 38     private int numberOfUrl = 0;
 39     
 40     public PR() 
 41     {
 42         conf = new Configuration();
 43     }
 44     
 45     
 46     public void initGraph() throws IOException, InterruptedException, ClassNotFoundException
 47     {
 48         Job job = Job.getInstance(conf, "Init Graph");
 49         job.setJarByClass(GraphSet.class);
 50         job.setMapperClass(GraphMapper.class);
 51         job.setMapOutputKeyClass(Text.class);
 52         job.setMapOutputValueClass(Text.class);
 53         job.setNumReduceTasks(TASK);
 54         job.setReducerClass(GraphReducer.class);
 55         job.setOutputKeyClass(NullWritable.class);
 56         job.setOutputValueClass(Text.class);
 57         FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
 58         FileOutputFormat.setOutputPath(job, new Path(TMP_PATH));
 59         job.waitForCompletion(true);
 60         initPRMap(TMP_PATH);
 61         conf.setInt("numberOfUrl", numberOfUrl);
 62     }
 63     
 64     
 65     public void pageRank() throws IOException, InterruptedException, ClassNotFoundException
 66     {
 67         int iter = 0;
 68         while (iter <= MAX) {
 69             Job job = Job.getInstance(conf, "PageRank");
 70             job.setJarByClass(PageRank.class);
 71             job.setMapperClass(PageRankMapper.class);
 72             job.setMapOutputKeyClass(Text.class);
 73             job.setMapOutputValueClass(DoubleWritable.class);
 74             job.setNumReduceTasks(TASK);
 75             job.setReducerClass(PageRankReducer.class);
 76             job.setOutputKeyClass(NullWritable.class);
 77             job.setOutputValueClass(Text.class);
 78             FileInputFormat.addInputPath(job, new Path(TMP_PATH));
 79             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH + iter));
 80             job.waitForCompletion(true);
 81             Map<String, Double> newPR = getNewPageRank(OUTPUT_PATH + iter);
 82             if (compare(PageRankMapper.mapPR, newPR))
 83                 break;
 84             else {
 85                 for (String key : newPR.keySet()) 
 86                     PageRankMapper.mapPR.put(key, newPR.get(key));
 87             }
 88             
 89             iter++;
 90         }
 91     }
 92     
 93     
 94     public void initPRMap(String filePath) 
 95     {
 96         String fileName = filePath + "/part-r-0000";
 97         Set<String> set = new TreeSet<String>();
 98         try {
 99             for (int i = 0; i < TASK; i++) {
100                 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
101                 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
102                 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
103                 String s = null;
104                 while ((s = br.readLine()) != null) {
105                     String[] str = s.split("\t");
106                     if (str.length == 0) 
107                         break;
108                     set.add(str[0]);
109                 }
110                 br.close();
111             }
112             
113             numberOfUrl = set.size();
114             for (String s : set)
115                 PageRankMapper.mapPR.put(s, (double)1/numberOfUrl);
116             
117         } catch (IOException e) {
118             e.printStackTrace();
119         }
120     }
121     
122     
123     public Map<String, Double> getNewPageRank(String filePath)
124     {
125         Map<String, Double> newPR = new HashMap<String, Double>();
126         String fileName = filePath + "/part-r-0000";
127         try {
128             for (int i = 0; i < TASK; i++) {
129                 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
130                 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
131                 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
132                 String s = null;
133                 while ((s = br.readLine()) != null) {
134                     String[] elements = s.split("\t");
135                     newPR.put(elements[0], Double.parseDouble(elements[1]));
136                 }
137                 br.close();
138             }
139         } catch (IOException e) {
140             e.printStackTrace();
141         }
142         return newPR;
143     }
144     
145     
146     public boolean compare(Map<String, Double> oldPR, Map<String, Double> newPR) {
147         boolean ret = false;
148         int newPRSize = newPR.size();
149         int oldPRSize = oldPR.size();
150         if (oldPRSize == newPRSize) {
151             int count = 0;
152             for (String key : oldPR.keySet()) {
153                 if (newPR.containsKey(key)) {
154                     if (Math.abs(newPR.get(key) - oldPR.get(key)) <= THRESHOLD)
155                         count++;
156                 }
157             }
158 
159             if (count == newPRSize)
160                 ret = true;
161         }
162         return ret;
163     }
164 
165 
166     public static void main(String[] args) {
167         PR pr = new PR();
168         try {
169             long start = System.currentTimeMillis();
170             pr.initGraph();
171             pr.pageRank();
172             long end = System.currentTimeMillis();
173             PR.logger.info("共耗時: " + (end-start));
174         } catch (ClassNotFoundException e) {
175             e.printStackTrace();
176         } catch (IOException e) {
177             e.printStackTrace();
178         } catch (InterruptedException e) {
179             e.printStackTrace();
180         }
181     }
182     
183 }
Class PR

   本文插圖參考:http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html

 

 


  作者:志青雲集
  出處:http://www.cnblogs.com/lyssym
  如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
  如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
  如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【志青雲集】。
  本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接。


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM