Hadoop之倒排索引


前言:
  從IT跨度到DT,如今的數據每天都在海量的增長。面對如此巨大的數據,如何能讓搜索引擎更好的工作呢?本文作為Hadoop系列的第二篇,將介紹分布式情況下搜索引擎的基礎實現,即“倒排索引”。

1.問題描述
 將所有不同文件里面的關鍵詞進行存儲,並實現快速檢索。下面假設有3個文件的數據如下:

file1.txt:MapReduce is simple
file2.txt:mapReduce is powerful is simple
file3.txt:Hello MapReduce bye MapReduce

 最終應生成如下索引結果:

Hello     file3.txt:1
MapReduce    file3.txt:2;file2.txt:1;file1.txt:1
bye     file3.txt:1
is     file2.txt:2;file1.txt:1
powerful    file2.txt:1
simple     file2.txt:1;file1.txt:1

--------------------------------------------------------

2.設計
  首先,我們對讀入的數據利用Map操作進行預處理,如圖1:

對比之前的單詞計數(WorldCount.java),要實現倒排索引單靠Map和Reduce操作明顯無法完成,因此中間我們加入'Combine',即合並操作;具體如圖2:

--------------------------------------------------------------

3.代碼實現

 

  1 package pro;
  2 
  3 import java.io.IOException;
  4 import java.util.StringTokenizer;
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.IntWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.util.GenericOptionsParser;
 16 
 17 public class InvertedIndex {
 18     final static String INPUT_PATH = "hdfs://hadoop0:9000/index_in";
 19     final static String OUTPUT_PATH = "hdfs://hadoop0:9000/index_out";
 20 
 21     public static class Map extends Mapper<Object, Text, Text, Text> {
 22 
 23         private Text keyInfo = new Text(); // 存儲單詞和URL組合
 24         private Text valueInfo = new Text(); // 存儲詞頻
 25         private FileSplit split; // 存儲Split對象
 26 
 27         // 實現map函數
 28         public void map(Object key, Text value, Context context)
 29                 throws IOException, InterruptedException {
 30             // 獲得<key,value>對所屬的FileSplit對象
 31             split = (FileSplit) context.getInputSplit();
 32             StringTokenizer itr = new StringTokenizer(value.toString());
 33             while (itr.hasMoreTokens()) {
 34 
 35                 // 只獲取文件的名稱。
 36                 int splitIndex = split.getPath().toString().indexOf("file");
 37                 keyInfo.set(itr.nextToken() + ":"
 38                         + split.getPath().toString().substring(splitIndex));
 39                 // 詞頻初始化為1
 40                 valueInfo.set("1");
 41                 context.write(keyInfo, valueInfo);
 42             }
 43         }
 44     }
 45 
 46     public static class Combine extends Reducer<Text, Text, Text, Text> {
 47         private Text info = new Text();
 48 
 49         // 實現reduce函數
 50         public void reduce(Text key, Iterable<Text> values, Context context)
 51                 throws IOException, InterruptedException {
 52             // 統計詞頻
 53             int sum = 0;
 54             for (Text value : values) {
 55                 sum += Integer.parseInt(value.toString());
 56             }
 57 
 58             int splitIndex = key.toString().indexOf(":");
 59             // 重新設置value值由URL和詞頻組成
 60             info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
 61             // 重新設置key值為單詞
 62             key.set(key.toString().substring(0, splitIndex));
 63             context.write(key, info);
 64         }
 65     }
 66 
 67     public static class Reduce extends Reducer<Text, Text, Text, Text> {
 68         private Text result = new Text();
 69 
 70         // 實現reduce函數
 71         public void reduce(Text key, Iterable<Text> values, Context context)
 72                 throws IOException, InterruptedException {
 73             // 生成文檔列表
 74             String fileList = new String();
 75             for (Text value : values) {
 76                 fileList += value.toString() + ";";
 77             }
 78             result.set(fileList);
 79 
 80             context.write(key, result);
 81         }
 82     }
 83 
 84     public static void main(String[] args) throws Exception {
 85 
 86         Configuration conf = new Configuration();
 87 
 88         Job job = new Job(conf, "Inverted Index");
 89         job.setJarByClass(InvertedIndex.class);
 90 
 91         // 設置Map、Combine和Reduce處理類
 92         job.setMapperClass(Map.class);
 93         job.setCombinerClass(Combine.class);
 94         job.setReducerClass(Reduce.class);
 95 
 96         // 設置Map輸出類型
 97         job.setMapOutputKeyClass(Text.class);
 98         job.setMapOutputValueClass(Text.class);
 99 
100         // 設置Reduce輸出類型
101         job.setOutputKeyClass(Text.class);
102         job.setOutputValueClass(Text.class);
103 
104         // 設置輸入和輸出目錄
105         FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
106         FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
107         System.exit(job.waitForCompletion(true) ? 0 : 1);
108     }
109 }

 

4.測試結果

 

Hello        file3.txt:1;
MapReduce    file3.txt:2;file1.txt:1;file2.txt:1;
bye        file3.txt:1;
is        file1.txt:1;file2.txt:2;
powerful    file2.txt:1;
simple        file2.txt:1;file1.txt:1;

Reference:

[1]Hadoop權威指南【A】Tom Wbite

[2]深入雲計算·Hadoop應用開發實戰詳解【A】萬川梅 謝正蘭

--------------

結語:

  從上面的Map---> Combine ----> Reduce操作過程中,我們可以體會到“倒排索引”的過程其實也就是不斷組合並拆分字符串的過程,而這也就是Hadoop中MapReduce並行計算的體現。在現今的大部分企業當中,Hadoop主要應用之一就是針對日志進行處理,所以想進軍大數據領域的朋友,對於Hadoop的Map/Reduce實現原理可以通過更多的實戰操作加深理解。本文僅僅只是牛刀小試,對於Hadoop的深層應用本人也正在慢慢摸索~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM