劉 勇 Email:lyssym@sina.com
簡介
鑒於在Web抓取服務和文本挖掘之句子向量中對權重值的計算需要,本文基於MapReduce計算模型實現了PageRank算法。為驗證本文算法的有效性,本文采用177萬余條源URL到目標URL鏈接的數據集,並迭代101次來展開測試,測試結果表明:對上述數據集進行測試,總計耗時40.29分鍾。因此,在權重評定的算法設計與實現中引入該思想,具有較好的現實意義。
引言
在Web抓取服務中,由於采用多個定向爬蟲對網頁進行抓取,因此其面臨2個重要問題,1)爬蟲的調度問題,不同的爬蟲的抓取頻率決定了獲取該站點的信息數量;2)爬蟲的深度問題,在某個站點內抓取越深,其獲取的信息越陳舊,而爬蟲設計者的本意是及時回頭,即到達一定深度后,返回站點首頁或者回退至上一步。因此,基於上述現實問題,本文作者擬將Google的PageRank算法的排名思想引入至該應用中,通過其PageRank值來確定各站點的抓取頻率,但是采用這種設計的結果,則是設計一個全網爬蟲。
在文本挖掘研究中,對句子權重的研究中,擬引入PageRank思想來計算句子向量的權重中,主要是基於句子相互間存在語義關聯。
鑒於上述原因,結合Web數據規模日益擴展的需要,采用MapReduce計算模型實現PageRank算法。
PageRank設計
1) 有向圖
互聯網中的網頁可以視為有向圖,每一個網頁可以視為一個節點,若網頁A中有一個鏈接到B,則存在一條有向邊AàB。如圖1所示,為一個簡單的網絡鏈接有向圖。
圖1 網絡鏈接有向圖
根據圖1對PageRank排名思想解釋如下:對於A節點,其存在3條鏈出(A-->B,A-->C,A-->D),因此B、C、D分別獲得A節點PageRank(PR)值的1/3;但是,節點A又存在2條鏈入(B--> A,C-->A),即節點B的PR的1/2和節點C的PR對A有貢獻。其它節點,與之類同,在此不做贅述。
根據上述分析,很明顯PageRank算法需要做多次迭代,以期使各節點的PR值趨於穩定(最終達到收斂)。本文作者設計時,擬采用100為最大迭代次數。
2) 終止與陷阱
若將Web抓取服務(網絡爬蟲)視為馬爾科夫鏈過程,則上述收斂問題,必須滿足有向圖是強關聯的。但是現實網絡世界里,強關聯有向圖不太現實,即有些站點(網址)中的信息中根本沒有鏈接或者鏈接已失效,即面臨節點終止問題,如圖2所示。
圖2網絡鏈接有向圖
在圖2中,節點C沒有鏈出,即Web抓取服務在C節點面臨終止,其造成PR值在多次迭代之后收斂於0。按照網絡爬蟲設計的本意,當面臨節點終止時,應該跳轉至其它的節點,最常見的做法為跳轉至站點首頁或者返回上一級。
此外,有些站點(網址)為了方便向用戶提供服務,會在網頁醒目位置多次嵌入本網頁的鏈接,如門戶網站。但是,若某一個站點(網址)只嵌入本網頁的鏈接,則必然造成網絡爬蟲掉入陷阱中,進入死循環,即面臨節點陷阱問題,如圖3所示。
圖3 網絡鏈接有向圖
在圖3中,節點C只存在自身的鏈出,即Web抓取服務在C節點陷入死循環,其造成PR值在多次迭代之后,C節點的PR值為1,其它節點的PR值均為0。按照網絡爬蟲設計的本意,當面節點陷阱時,應該跳轉至其它的節點,最常見的做法為跳轉至站點首頁或者返回上一級。
因此,本文作者綜合上述2種特殊情形,引入以下策略,每個節點,都有權力選擇進入下一個節點,或者回退至上一個節點。具體數學表達式如公式1所示:
(公式1)
如公式1所示,α為影響因子, 和分別為當前迭代中某節點的PR值和上一次迭代中某節點的PR。鑒於每個節點均有權利回退至上一個節點,由於表征上一次迭代中所有鏈入之和,故此引入該參數來表征回退至上一次的可能性。
3) 算法設計
本文在設計中,采用兩級MapReduce計算模型來實現,第一級MapReduce生成網絡鏈接的有向圖;第二級MapReduce用於迭代PR,以確定PR是否收斂。需要指出,本文在設計過程中,Reducer均設定為5個。
針對第一級MapReduce,對其Mapper和Reducer簡要描述如下:
Mapper:完成源URL和目標URL的標識,如A-->B;
Reducer: 根據源URL(有向圖),實現某一節點至其所有鏈出的標識,如A-->B-->C-->D,需要指出,上述標識表示,A-->B,A-->C,A-->D。采用該設計主要是為了節省節點存儲空間。
針對第二級MapReduce,對其Mapper和Reducer簡要描述如下:
Mapper: 完成鏈出節點及其PR獲取,如B節點存在B-->A,B-->D,則A和D均分B的PR值;
Reducer:針對Mapper中每個節點及其PR,對該節點PR值求和,並采用公式1進行量化。
其中第二級MapReduce采用多次迭代,若迭代過程中,節點的PR已收斂,則退出迭代。
4) 測試結果
本文測試環境描述如下,采用10台物理機組成Hadoop集群,CPU:Intel(R) Core(TM) i5-4440 CPU @ 3.10GHz,內存:4G,Hadoop:2.7.1,以上描述為集群的大概配置,其中某個節點的配置可能不一致,本文作者也並未對每個節點進行詳細確認。本文測試數據集采用177萬余條鏈接,迭代101次(迭代次數當時控制失誤,本意為100次,作者比較懶就沒有重新再次迭代了),總計耗時為40.29分鍾。從整體而言,其處理速率在1小時內,還是能夠接受的。
總結
本文對基於MapReduce的PageRank算法進行研究與實現,經過實際數據集進行測試,測試結果表明,該測試結果處理速率還是能夠接受的。但是,本文作者的意圖並不是為了實現該算法,而是將該算法的設計思想引入后續Web抓取服務的優化與改進之中,以及后續文本挖掘中對權重值計算的需要之中。
程序源代碼:
1 import java.io.IOException; 2 import org.apache.hadoop.io.NullWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Mapper; 5 import org.apache.hadoop.mapreduce.Reducer; 6 import org.apache.logging.log4j.LogManager; 7 import org.apache.logging.log4j.Logger; 8 9 public class GraphSet { 10 11 public static class GraphMapper extends Mapper<Object, Text, Text, Text> { 12 public static Logger logger = LogManager.getLogger(GraphMapper.class); 13 14 public void map(Object key, Text value, Context context) 15 { 16 String[] link = value.toString().split("\t"); 17 try { 18 context.write(new Text(link[0]), new Text(link[1])); 19 } catch (IOException e) { 20 e.printStackTrace(); 21 } catch (InterruptedException e) { 22 e.printStackTrace(); 23 } 24 } 25 } 26 27 28 public static class GraphReducer extends Reducer<Text, Text, NullWritable, Text> { 29 public static Logger logger = LogManager.getLogger(GraphReducer.class); 30 public void reduce(Text key, Iterable<Text> values, Context context) 31 { 32 StringBuilder sb = new StringBuilder(); 33 sb.append(key.toString()); 34 for (Text e : values) { 35 sb.append("\t"); 36 sb.append(e.toString()); 37 } 38 39 try { 40 context.write(NullWritable.get(), new Text(sb.toString())); 41 } catch (IOException e1) { 42 e1.printStackTrace(); 43 } catch (InterruptedException e1) { 44 e1.printStackTrace(); 45 } 46 } 47 } 48 49 }
1 import java.io.IOException; 2 import java.util.Map; 3 import java.util.HashMap; 4 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.io.DoubleWritable; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.logging.log4j.LogManager; 11 import org.apache.logging.log4j.Logger; 12 13 public class PageRank { 14 public static double alpha = 0.8; 15 16 public static class PageRankMapper extends Mapper<Object, Text, Text, DoubleWritable> { 17 public static Map<String, Double> mapPR = new HashMap<String, Double>(); 18 public static Logger logger = LogManager.getLogger(PageRankMapper.class); 19 20 public void map(Object key, Text value, Context context) { 21 String[] str = value.toString().split("\t"); 22 int size = str.length; 23 double linkOut = mapPR.get(str[0]); 24 try { 25 for (int i = 0; i < size-1; i++) { 26 DoubleWritable dw = new DoubleWritable(); 27 dw.set(linkOut / (size-1)); // count the output of links 28 context.write(new Text(str[i+1]), dw); 29 } 30 } catch (Exception e) { 31 e.printStackTrace(); 32 } 33 } 34 } 35 36 37 public static class PageRankReducer extends Reducer<Text, DoubleWritable, NullWritable, Text> { 38 public static Logger logger = LogManager.getLogger(PageRankReducer.class); 39 private int numberOfUrl = 0; 40 41 public void setup(Context context) { 42 numberOfUrl = context.getConfiguration().getInt("numberOfUrl", Integer.MAX_VALUE); 43 } 44 45 46 public void reduce(Text key, Iterable<DoubleWritable> values, Context context) { 47 double factor = 0; 48 double sum = 0; 49 for (DoubleWritable d : values) 50 sum += d.get(); 51 52 if (PageRankMapper.mapPR.containsKey(key.toString())) 53 factor = PageRankMapper.mapPR.get((key.toString())); 54 else 55 factor = (double)1/(2*numberOfUrl); 56 57 sum = alpha*sum + (1-alpha)*factor; 58 String ret = key.toString() + "\t" + String.valueOf(sum); 59 try { 60 context.write(NullWritable.get(), new Text(ret)); 61 } catch (IOException e) { 62 e.printStackTrace(); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 } 67 } 68 69 }
1 import java.io.BufferedReader; 2 import java.io.IOException; 3 import java.io.InputStreamReader; 4 import java.net.URI; 5 import java.util.Map; 6 import java.util.HashMap; 7 import java.util.Set; 8 import java.util.TreeSet; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.fs.FSDataInputStream; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.DoubleWritable; 15 import org.apache.hadoop.io.NullWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.logging.log4j.LogManager; 21 import org.apache.logging.log4j.Logger; 22 23 import com.gta.graph.GraphSet; 24 import com.gta.graph.GraphSet.GraphMapper; 25 import com.gta.graph.GraphSet.GraphReducer; 26 import com.gta.pagerank.PageRank.PageRankMapper; 27 import com.gta.pagerank.PageRank.PageRankReducer; 28 29 public class PR { 30 public static final int MAX = 100; 31 public static final int TASK = 5; 32 public static final double THRESHOLD = 1e-10; 33 public static final String INPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/input/"; 34 public static final String OUTPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/output/"; 35 public static final String TMP_PATH = "hdfs://10.1.130.10:9000/user/hadoop/pagerank/tmp/"; 36 public static Logger logger = LogManager.getLogger(PR.class); 37 private Configuration conf = null; 38 private int numberOfUrl = 0; 39 40 public PR() 41 { 42 conf = new Configuration(); 43 } 44 45 46 public void initGraph() throws IOException, InterruptedException, ClassNotFoundException 47 { 48 Job job = Job.getInstance(conf, "Init Graph"); 49 job.setJarByClass(GraphSet.class); 50 job.setMapperClass(GraphMapper.class); 51 job.setMapOutputKeyClass(Text.class); 52 job.setMapOutputValueClass(Text.class); 53 job.setNumReduceTasks(TASK); 54 job.setReducerClass(GraphReducer.class); 55 job.setOutputKeyClass(NullWritable.class); 56 job.setOutputValueClass(Text.class); 57 FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); 58 FileOutputFormat.setOutputPath(job, new Path(TMP_PATH)); 59 job.waitForCompletion(true); 60 initPRMap(TMP_PATH); 61 conf.setInt("numberOfUrl", numberOfUrl); 62 } 63 64 65 public void pageRank() throws IOException, InterruptedException, ClassNotFoundException 66 { 67 int iter = 0; 68 while (iter <= MAX) { 69 Job job = Job.getInstance(conf, "PageRank"); 70 job.setJarByClass(PageRank.class); 71 job.setMapperClass(PageRankMapper.class); 72 job.setMapOutputKeyClass(Text.class); 73 job.setMapOutputValueClass(DoubleWritable.class); 74 job.setNumReduceTasks(TASK); 75 job.setReducerClass(PageRankReducer.class); 76 job.setOutputKeyClass(NullWritable.class); 77 job.setOutputValueClass(Text.class); 78 FileInputFormat.addInputPath(job, new Path(TMP_PATH)); 79 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH + iter)); 80 job.waitForCompletion(true); 81 Map<String, Double> newPR = getNewPageRank(OUTPUT_PATH + iter); 82 if (compare(PageRankMapper.mapPR, newPR)) 83 break; 84 else { 85 for (String key : newPR.keySet()) 86 PageRankMapper.mapPR.put(key, newPR.get(key)); 87 } 88 89 iter++; 90 } 91 } 92 93 94 public void initPRMap(String filePath) 95 { 96 String fileName = filePath + "/part-r-0000"; 97 Set<String> set = new TreeSet<String>(); 98 try { 99 for (int i = 0; i < TASK; i++) { 100 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf); 101 FSDataInputStream is = fs.open(new Path((fileName+i).toString())); 102 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8")); 103 String s = null; 104 while ((s = br.readLine()) != null) { 105 String[] str = s.split("\t"); 106 if (str.length == 0) 107 break; 108 set.add(str[0]); 109 } 110 br.close(); 111 } 112 113 numberOfUrl = set.size(); 114 for (String s : set) 115 PageRankMapper.mapPR.put(s, (double)1/numberOfUrl); 116 117 } catch (IOException e) { 118 e.printStackTrace(); 119 } 120 } 121 122 123 public Map<String, Double> getNewPageRank(String filePath) 124 { 125 Map<String, Double> newPR = new HashMap<String, Double>(); 126 String fileName = filePath + "/part-r-0000"; 127 try { 128 for (int i = 0; i < TASK; i++) { 129 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf); 130 FSDataInputStream is = fs.open(new Path((fileName+i).toString())); 131 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8")); 132 String s = null; 133 while ((s = br.readLine()) != null) { 134 String[] elements = s.split("\t"); 135 newPR.put(elements[0], Double.parseDouble(elements[1])); 136 } 137 br.close(); 138 } 139 } catch (IOException e) { 140 e.printStackTrace(); 141 } 142 return newPR; 143 } 144 145 146 public boolean compare(Map<String, Double> oldPR, Map<String, Double> newPR) { 147 boolean ret = false; 148 int newPRSize = newPR.size(); 149 int oldPRSize = oldPR.size(); 150 if (oldPRSize == newPRSize) { 151 int count = 0; 152 for (String key : oldPR.keySet()) { 153 if (newPR.containsKey(key)) { 154 if (Math.abs(newPR.get(key) - oldPR.get(key)) <= THRESHOLD) 155 count++; 156 } 157 } 158 159 if (count == newPRSize) 160 ret = true; 161 } 162 return ret; 163 } 164 165 166 public static void main(String[] args) { 167 PR pr = new PR(); 168 try { 169 long start = System.currentTimeMillis(); 170 pr.initGraph(); 171 pr.pageRank(); 172 long end = System.currentTimeMillis(); 173 PR.logger.info("共耗時: " + (end-start)); 174 } catch (ClassNotFoundException e) { 175 e.printStackTrace(); 176 } catch (IOException e) { 177 e.printStackTrace(); 178 } catch (InterruptedException e) { 179 e.printStackTrace(); 180 } 181 } 182 183 }
本文插圖參考:http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html
作者:志青雲集
出處:http://www.cnblogs.com/lyssym
如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【志青雲集】。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接。
