在HDFS上處理數據時,為快速訪問,有時候需要對數據進行分布式索引。很不巧,我們所熟悉的Lucene並不支持HDFS上的索引操作。Lucene中的Document不支持MapReduce輸出類型的Writable接口,因此我們無法直接使用Document作為MapReduce的輸出類型。雖然Nutch這一搜索爬蟲基於Lucene實現HDFS上建立和維護索引的功能,但是在Nutch中對Lucene進行了很多的重寫。
下面將結合前一篇文章(自定義數據類型)中定義的HDFSDocument類,顯示如何完成HDFS上建立Lucene操作的。首先是自定義的輸出格式HDSDocumentOutput類。
package hdfs.document;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapred.RecordWriter;
public class HDSDocumentOutput extends FileOutputFormat<Text, HDFSDocument>{
@Override
public RecordWriter<Text, HDFSDocument> getRecordWriter(
final FileSystem fs, JobConf job, String name, final Progressable progress)
throws IOException {
// LuceneWriter是包含Lucene的IndexWriter對象的類
final LuceneWriter lw = new LuceneWriter();
// 完成索引前的配置工作
lw.open(job, name);
return new RecordWriter<Text, HDFSDocument>(){
@Override
public void close(final Reporter reporter) throws IOException {
// 完成索引優化,關閉IndexWriter的對象
lw.close();
}
@Override
public void write(Text arg0, HDFSDocument doc) throws IOException {
// 建立索引
lw.write(doc);
}
};
}
}
LuceneWriter類接受HDFSDocument類的對象,從中讀取信息,完成建立索引和優化的操作。LuceneWriter類的代碼如下:
package hdfs.document;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
public class LuceneWriter {
private Path perm;
private Path temp;
private FileSystem fs;
private IndexWriter writer;
public void open(JobConf job, String name) throws IOException{
this.fs = FileSystem.get(job);
perm = new Path(FileOutputFormat.getOutputPath(job), name);
// 臨時本地路徑,存儲臨時的索引結果
temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt()));
fs.delete(perm, true);
// 配置IndexWriter(個人對Lucene索引的參數不是太熟悉)
LimitTokenCountAnalyzer ltca = new LimitTokenCountAnalyzer(new StandardAnalyzer(Version.LUCENE_34),
Integer.MAX_VALUE);
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_34, ltca);
conf.setMaxBufferedDocs(100000);
LogMergePolicy mergePolicy = new LogDocMergePolicy();
mergePolicy.setMergeFactor(100000);
mergePolicy.setMaxMergeDocs(100000);
conf.setMergePolicy(mergePolicy);
conf.setRAMBufferSizeMB(256);
conf.setMergePolicy(mergePolicy);
writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp).toString())),
conf);
}
public void close() throws IOException{
// 索引優化和IndexWriter對象關閉
writer.optimize();
writer.close();
// 將本地索引結果拷貝到HDFS上
fs.completeLocalOutput(perm, temp);
fs.createNewFile(new Path(perm,"index.done"));
}
/*
* 接受HDFSDocument對象,從中讀取信息並建立索引
*/
public void write(HDFSDocument doc) throws IOException{
String key = null;
HashMap<String, String> fields = doc.getFields();
Iterator<String> iter = fields.keySet().iterator();
while(iter.hasNext()){
key = iter.next();
Document luceneDoc = new Document();
// 如果使用Field.Index.ANALYZED選項,則默認情況下會對中文進行分詞。
// 如果這時候采用Term的形式進行檢索,將會出現檢索失敗的情況。
luceneDoc.add(new Field("key", key, Field.Store.YES, Field.Index.NOT_ANALYZED));
luceneDoc.add(new Field("value", fields.get(key), Field.Store.YES, Field.Index.NOT_ANALYZED));
writer.addDocument(luceneDoc);
}
}
}
最后,需要設置任務的輸出格式,代碼如下:
job.setOutputValueClass(HDFSDocument.class); job.setOutputFormat(HDSDocumentOutput.class);
到此,基於Lucene的HDFS分布式索引構建完成。可以看出,這種建立索引方式,是先在本地建立索引,然后再拷貝到HDFS上的。
補充:有另一種使用Lucene建立分布式索引的方法
