在HDFS上处理数据时,为快速访问,有时候需要对数据进行分布式索引。很不巧,我们所熟悉的Lucene并不支持HDFS上的索引操作。Lucene中的Document不支持MapReduce输出类型的Writable接口,因此我们无法直接使用Document作为MapReduce的输出类型。虽然Nutch这一搜索爬虫基于Lucene实现HDFS上建立和维护索引的功能,但是在Nutch中对Lucene进行了很多的重写。
下面将结合前一篇文章(自定义数据类型)中定义的HDFSDocument类,显示如何完成HDFS上建立Lucene操作的。首先是自定义的输出格式HDSDocumentOutput类。
package hdfs.document; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.RecordWriter; public class HDSDocumentOutput extends FileOutputFormat<Text, HDFSDocument>{ @Override public RecordWriter<Text, HDFSDocument> getRecordWriter( final FileSystem fs, JobConf job, String name, final Progressable progress) throws IOException { // LuceneWriter是包含Lucene的IndexWriter对象的类 final LuceneWriter lw = new LuceneWriter(); // 完成索引前的配置工作 lw.open(job, name); return new RecordWriter<Text, HDFSDocument>(){ @Override public void close(final Reporter reporter) throws IOException { // 完成索引优化,关闭IndexWriter的对象 lw.close(); } @Override public void write(Text arg0, HDFSDocument doc) throws IOException { // 建立索引 lw.write(doc); } }; } }
LuceneWriter类接受HDFSDocument类的对象,从中读取信息,完成建立索引和优化的操作。LuceneWriter类的代码如下:
package hdfs.document; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Random; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.lucene.analysis.LimitTokenCountAnalyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LogDocMergePolicy; import org.apache.lucene.index.LogMergePolicy; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; public class LuceneWriter { private Path perm; private Path temp; private FileSystem fs; private IndexWriter writer; public void open(JobConf job, String name) throws IOException{ this.fs = FileSystem.get(job); perm = new Path(FileOutputFormat.getOutputPath(job), name); // 临时本地路径,存储临时的索引结果 temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt())); fs.delete(perm, true); // 配置IndexWriter(个人对Lucene索引的参数不是太熟悉) LimitTokenCountAnalyzer ltca = new LimitTokenCountAnalyzer(new StandardAnalyzer(Version.LUCENE_34), Integer.MAX_VALUE); IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_34, ltca); conf.setMaxBufferedDocs(100000); LogMergePolicy mergePolicy = new LogDocMergePolicy(); mergePolicy.setMergeFactor(100000); mergePolicy.setMaxMergeDocs(100000); conf.setMergePolicy(mergePolicy); conf.setRAMBufferSizeMB(256); conf.setMergePolicy(mergePolicy); writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp).toString())), conf); } public void close() throws IOException{ // 索引优化和IndexWriter对象关闭 writer.optimize(); writer.close(); // 将本地索引结果拷贝到HDFS上 fs.completeLocalOutput(perm, temp); fs.createNewFile(new Path(perm,"index.done")); } /* * 接受HDFSDocument对象,从中读取信息并建立索引 */ public void write(HDFSDocument doc) throws IOException{ String key = null; HashMap<String, String> fields = doc.getFields(); Iterator<String> iter = fields.keySet().iterator(); while(iter.hasNext()){ key = iter.next(); Document luceneDoc = new Document(); // 如果使用Field.Index.ANALYZED选项,则默认情况下会对中文进行分词。 // 如果这时候采用Term的形式进行检索,将会出现检索失败的情况。 luceneDoc.add(new Field("key", key, Field.Store.YES, Field.Index.NOT_ANALYZED)); luceneDoc.add(new Field("value", fields.get(key), Field.Store.YES, Field.Index.NOT_ANALYZED)); writer.addDocument(luceneDoc); } } }
最后,需要设置任务的输出格式,代码如下:
job.setOutputValueClass(HDFSDocument.class); job.setOutputFormat(HDSDocumentOutput.class);
到此,基于Lucene的HDFS分布式索引构建完成。可以看出,这种建立索引方式,是先在本地建立索引,然后再拷贝到HDFS上的。
补充:有另一种使用Lucene建立分布式索引的方法