Hadoop MapReduce 上利用Lucene實現分布式索引


  在HDFS上處理數據時,為快速訪問,有時候需要對數據進行分布式索引。很不巧,我們所熟悉的Lucene並不支持HDFS上的索引操作。Lucene中的Document不支持MapReduce輸出類型的Writable接口,因此我們無法直接使用Document作為MapReduce的輸出類型。雖然Nutch這一搜索爬蟲基於Lucene實現HDFS上建立和維護索引的功能,但是在Nutch中對Lucene進行了很多的重寫。

  下面將結合前一篇文章(自定義數據類型)中定義的HDFSDocument類,顯示如何完成HDFS上建立Lucene操作的。首先是自定義的輸出格式HDSDocumentOutput類。

package hdfs.document;

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapred.RecordWriter;

public class HDSDocumentOutput  extends FileOutputFormat<Text, HDFSDocument>{

	@Override
	public RecordWriter<Text, HDFSDocument> getRecordWriter(
			final FileSystem fs, JobConf job, String name, final Progressable progress)
					throws IOException {
		// LuceneWriter是包含Lucene的IndexWriter對象的類
		final LuceneWriter lw = new LuceneWriter();
		// 完成索引前的配置工作
		lw.open(job, name);	
		
		return new RecordWriter<Text, HDFSDocument>(){

			@Override
			public void close(final Reporter reporter) throws IOException {
				// 完成索引優化,關閉IndexWriter的對象
				lw.close();
			}

			@Override
			public void write(Text arg0, HDFSDocument doc) throws IOException {
				// 建立索引
				lw.write(doc);
			}	
		};
	}	
}

  LuceneWriter類接受HDFSDocument類的對象,從中讀取信息,完成建立索引和優化的操作。LuceneWriter類的代碼如下:

package hdfs.document;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;

public class LuceneWriter {
	
	private Path perm;
	private Path temp;
	private FileSystem fs;
	private IndexWriter writer;
	
	public void open(JobConf job, String name) throws IOException{
		this.fs = FileSystem.get(job);
		perm = new Path(FileOutputFormat.getOutputPath(job), name); 
		
		// 臨時本地路徑,存儲臨時的索引結果
		temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt()));
		fs.delete(perm, true);
		
		// 配置IndexWriter(個人對Lucene索引的參數不是太熟悉)
		LimitTokenCountAnalyzer ltca = new LimitTokenCountAnalyzer(new StandardAnalyzer(Version.LUCENE_34),
																   Integer.MAX_VALUE);
		IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_34, ltca);
		conf.setMaxBufferedDocs(100000);		
		LogMergePolicy mergePolicy = new LogDocMergePolicy();
		mergePolicy.setMergeFactor(100000);
		mergePolicy.setMaxMergeDocs(100000);
		conf.setMergePolicy(mergePolicy);		
		conf.setRAMBufferSizeMB(256);
		conf.setMergePolicy(mergePolicy);		
		
		writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp).toString())),
								conf);
	}
	public void close() throws IOException{
		// 索引優化和IndexWriter對象關閉
		writer.optimize();
		writer.close();
		
		// 將本地索引結果拷貝到HDFS上
		fs.completeLocalOutput(perm, temp);
		fs.createNewFile(new Path(perm,"index.done"));		
	}
	
	/*
	* 接受HDFSDocument對象,從中讀取信息並建立索引
	*/
	public void write(HDFSDocument doc) throws IOException{

		String key = null;
		HashMap<String, String> fields = doc.getFields();
		Iterator<String> iter = fields.keySet().iterator();
		while(iter.hasNext()){
			key = iter.next();
						
			Document luceneDoc = new Document();
			
			// 如果使用Field.Index.ANALYZED選項,則默認情況下會對中文進行分詞。
			// 如果這時候采用Term的形式進行檢索,將會出現檢索失敗的情況。
			luceneDoc.add(new Field("key", key, Field.Store.YES, Field.Index.NOT_ANALYZED));
			luceneDoc.add(new Field("value", fields.get(key), Field.Store.YES, Field.Index.NOT_ANALYZED));						
			writer.addDocument(luceneDoc);
		}
	}
}

  最后,需要設置任務的輸出格式,代碼如下:

		job.setOutputValueClass(HDFSDocument.class);
		job.setOutputFormat(HDSDocumentOutput.class);	

 到此,基於Lucene的HDFS分布式索引構建完成。可以看出,這種建立索引方式,是先在本地建立索引,然后再拷貝到HDFS上的。

 

補充:有另一種使用Lucene建立分布式索引的方法

http://www.drdobbs.com/article/print?articleID=226300241


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM