在HDFS上處理數據時,為快速訪問,有時候需要對數據進行分布式索引。很不巧,我們所熟悉的Lucene並不支持HDFS上的索引操作。Lucene中的Document不支持MapReduce輸出類型的Writable接口,因此我們無法直接使用Document作為MapReduce的輸出類型。雖然Nutch這一搜索爬蟲基於Lucene實現HDFS上建立和維護索引的功能,但是在Nutch中對Lucene進行了很多的重寫。
下面將結合前一篇文章(自定義數據類型)中定義的HDFSDocument類,顯示如何完成HDFS上建立Lucene操作的。首先是自定義的輸出格式HDSDocumentOutput類。
package hdfs.document; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.RecordWriter; public class HDSDocumentOutput extends FileOutputFormat<Text, HDFSDocument>{ @Override public RecordWriter<Text, HDFSDocument> getRecordWriter( final FileSystem fs, JobConf job, String name, final Progressable progress) throws IOException { // LuceneWriter是包含Lucene的IndexWriter對象的類 final LuceneWriter lw = new LuceneWriter(); // 完成索引前的配置工作 lw.open(job, name); return new RecordWriter<Text, HDFSDocument>(){ @Override public void close(final Reporter reporter) throws IOException { // 完成索引優化,關閉IndexWriter的對象 lw.close(); } @Override public void write(Text arg0, HDFSDocument doc) throws IOException { // 建立索引 lw.write(doc); } }; } }
LuceneWriter類接受HDFSDocument類的對象,從中讀取信息,完成建立索引和優化的操作。LuceneWriter類的代碼如下:
package hdfs.document; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Random; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.lucene.analysis.LimitTokenCountAnalyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LogDocMergePolicy; import org.apache.lucene.index.LogMergePolicy; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; public class LuceneWriter { private Path perm; private Path temp; private FileSystem fs; private IndexWriter writer; public void open(JobConf job, String name) throws IOException{ this.fs = FileSystem.get(job); perm = new Path(FileOutputFormat.getOutputPath(job), name); // 臨時本地路徑,存儲臨時的索引結果 temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt())); fs.delete(perm, true); // 配置IndexWriter(個人對Lucene索引的參數不是太熟悉) LimitTokenCountAnalyzer ltca = new LimitTokenCountAnalyzer(new StandardAnalyzer(Version.LUCENE_34), Integer.MAX_VALUE); IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_34, ltca); conf.setMaxBufferedDocs(100000); LogMergePolicy mergePolicy = new LogDocMergePolicy(); mergePolicy.setMergeFactor(100000); mergePolicy.setMaxMergeDocs(100000); conf.setMergePolicy(mergePolicy); conf.setRAMBufferSizeMB(256); conf.setMergePolicy(mergePolicy); writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp).toString())), conf); } public void close() throws IOException{ // 索引優化和IndexWriter對象關閉 writer.optimize(); writer.close(); // 將本地索引結果拷貝到HDFS上 fs.completeLocalOutput(perm, temp); fs.createNewFile(new Path(perm,"index.done")); } /* * 接受HDFSDocument對象,從中讀取信息並建立索引 */ public void write(HDFSDocument doc) throws IOException{ String key = null; HashMap<String, String> fields = doc.getFields(); Iterator<String> iter = fields.keySet().iterator(); while(iter.hasNext()){ key = iter.next(); Document luceneDoc = new Document(); // 如果使用Field.Index.ANALYZED選項,則默認情況下會對中文進行分詞。 // 如果這時候采用Term的形式進行檢索,將會出現檢索失敗的情況。 luceneDoc.add(new Field("key", key, Field.Store.YES, Field.Index.NOT_ANALYZED)); luceneDoc.add(new Field("value", fields.get(key), Field.Store.YES, Field.Index.NOT_ANALYZED)); writer.addDocument(luceneDoc); } } }
最后,需要設置任務的輸出格式,代碼如下:
job.setOutputValueClass(HDFSDocument.class); job.setOutputFormat(HDSDocumentOutput.class);
到此,基於Lucene的HDFS分布式索引構建完成。可以看出,這種建立索引方式,是先在本地建立索引,然后再拷貝到HDFS上的。
補充:有另一種使用Lucene建立分布式索引的方法