[ lucene擴展 ] 自定義Collector實現統計功能


對於lucene的統計,我基本放棄使用factedSearch了,效率不高,而且兩套索引總覺得有點臃腫!

這次我們通過改造Collector,實現簡單的統計功能。經過測試,對幾十萬的統計還是比較快的。

 

首先我們簡單理解下Collector在search中的使用情況!

Collector是一個接口,主要包括以下重要方法:

public abstract class Collector {
  
//指定打分器
  public abstract void setScorer(Scorer scorer) throws IOException;
  
//對目標結果進行收集,很重要!
  public abstract void collect(int doc) throws IOException;

//一個索引可能會有多個子索引,這里相當於是對子索引的遍歷操作
  public abstract void setNextReader(IndexReader reader, int docBase) throws IOException;

//
  public abstract boolean acceptsDocsOutOfOrder();
  
}

在search中我們來看看collector是怎么收集結果的!

public void search(Weight weight, Filter filter, Collector collector)
			throws IOException {

		// TODO: should we make this
		// threaded...? the Collector could be sync'd?

		// always use single thread:
		for (int i = 0; i < subReaders.length; i++) { // 檢索每個子索引
			collector.setNextReader(subReaders[i], docBase + docStarts[i]);
			final Scorer scorer = (filter == null) ? weight.scorer(
					subReaders[i], !collector.acceptsDocsOutOfOrder(), true)
					: FilteredQuery.getFilteredScorer(subReaders[i],
							getSimilarity(), weight, weight, filter);//構建打分器
			if (scorer != null) {
				scorer.score(collector);//打分
			}
		}
	}

scorer.score(collector)的過程如下:

public void score(Collector collector) throws IOException {
    collector.setScorer(this);
    int doc;
    while ((doc = nextDoc()) != NO_MORE_DOCS) {
      collector.collect(doc);//搜集結果
    }
  }

collector.collect(doc)的過程如下:

    @Override
    public void collect(int doc) throws IOException {
      float score = scorer.score();

      // This collector cannot handle these scores:
      assert score != Float.NEGATIVE_INFINITY;
      assert !Float.isNaN(score);

      totalHits++;
      if (score <= pqTop.score) {
        // 以下的實現使用了優先級隊列,如果當前分值小於隊列中pqTop.score則直接pass!
        return;
      }
      pqTop.doc = doc + docBase;
      pqTop.score = score;
      pqTop = pq.updateTop();
    }

從上面這一坨坨代碼我們可以大概看清collector在search中的應用情況。

那么統計呢?

首先我們來分析最簡單的統計——“一維統計”,就只對一個字段的統計。例如統計圖書每年的出版量、專利發明人發明專利數量的排行榜等。

統計的輸入:檢索式、統計字段

統計的輸出:<統計項、數量>的集合

其中關鍵是我們怎么拿到統計項。這個又分成以下一種情況:

1)統計字段沒有存儲、不分詞

我們可以使用FieldCache.DEFAULT.getStrings(reader, f);獲取統計項。

2)統計字段沒有存儲、分詞

需要通過唯一標識從數據庫(如果正向信息存在數據庫的話)取出統計項(字段內容),然后統計分析。可想而知效率極低。

3)統計字段存儲、分詞

可以通過doc.get(fieldName)取出統計項,依然比較低效

4)統計字段存儲、不分詞

和1)類似

因此我們如果要對某個字段進行統計,那么最好選用不分詞(Index.NOT_ANALYZED),這個和排序字段的要求類似!

拿到統計項后,我們可以通過累加然后排序。(這里可以借助map)

 

下面給出主要代碼:

package com.fox.group;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FieldCache;
import org.apache.lucene.search.Scorer;

/**
 * @author huangfox
 * @data 2012-7-10
 * @email huangfox009@126.com
 * @desc 
 */
public class GroupCollectorDemo extends Collector {

	private GF gf = new GF();// 保存分組統計結果
	private String[] fc;// fieldCache
	private String f;// 統計字段
	String spliter;
	int length;

	public void setFc(String[] fc) {
		this.fc = fc;
	}

	@Override
	public void setScorer(Scorer scorer) throws IOException {
	}

	@Override
	public void setNextReader(IndexReader reader, int docBase)
			throws IOException {
		fc = FieldCache.DEFAULT.getStrings(reader, f);
	}

	@Override
	public void collect(int doc) throws IOException {
		// 添加的GroupField中,由GroupField負責統計每個不同值的數目
		gf.addValue(fc[doc]);
	}

	@Override
	public boolean acceptsDocsOutOfOrder() {
		return true;
	}

	public GF getGroupField() {
		return gf;
	}

	public void setSpliter(String spliter) {
		this.spliter = spliter;
	}

	public void setLength(int length) {
		this.length = length;
	}

	public void setF(String f) {
		this.f = f;
	}
}

class GF {
	// 所有可能的分組字段值,排序按每個字段值的文檔個數大小排序
	private List<String> values = new ArrayList<String>();
	// 保存字段值和文檔個數的對應關系
	private Map<String, Integer> countMap = new HashMap<String, Integer>();

	public Map<String, Integer> getCountMap() {
		return countMap;
	}

	public void setCountMap(Map<String, Integer> countMap) {
		this.countMap = countMap;
	}

	public List<String> getValues() {
		Collections.sort(values, new ValueComparator());
		return values;
	}

	public void setValues(List<String> values) {
		this.values = values;
	}

	public void addValue(String value) {
		if (value == null || "".equals(value))
			return;
		if (countMap.get(value) == null) {
			countMap.put(value, 1);
			values.add(value);
		} else {
			countMap.put(value, countMap.get(value) + 1);
		}
	}

	class ValueComparator implements Comparator<String> {
		public int compare(String value0, String value1) {
			if (countMap.get(value0) > countMap.get(value1)) {
				return -1;
			} else if (countMap.get(value0) < countMap.get(value1)) {
				return 1;
			}
			return 0;
		}
	}
}

這里是對collector的collect方法的討巧應用,search是對打分的排序,統計是構造一個結果收集器,提供排序功能。

測試類:

package com.fox.group;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.Version;
import org.wltea.analyzer.lucene.IKAnalyzer;

/**
 * @author huangfox
 * @data 2012-7-10
 * @email huangfox009@126.com
 * @desc 
 */
public class GroupTest {
	public static void main(String[] f) throws IOException, ParseException {
		FSDirectory dir = SimpleFSDirectory.open(new File("d:/nrttest"));
		IndexReader reader = IndexReader.open(dir);
		IndexSearcher searcher = new IndexSearcher(reader);
		// GroupCollector是自定義文檔收集器,用於實現分組統計
		String str = "";
		QueryParser parser = new QueryParser(Version.LUCENE_36, "f",
				new IKAnalyzer());
		while (true) {
			str = "an:cn*";
			long bt = System.currentTimeMillis();
			Query query = parser.parse(str);
			System.out.println(query);
			GroupCollectorDemo myCollector = new GroupCollectorDemo();
			// myCollector.setFc(ad);
			myCollector.setF("in");
			searcher.search(query, myCollector);
			// GroupField用來保存分組統計的結果
			GF gf = myCollector.getGroupField();
			List<String> values = gf.getValues();
			long et = System.currentTimeMillis();
			System.out.println((et - bt) + "ms");
			for (int i = 0; i < 10; i++) {
				String value = values.get(i);
				System.out.println(value + "=" + gf.getCountMap().get(value));
			}
		}
	}
}

以上是對200多萬數據的統計,而且是全數據統計。測試結果如下:

an:cn*
6616ms
毛裕民;謝毅=13728
邱則有=10126
楊孟君=3771
王爾中=1712
王信鎖=1658
張逶=1314
朱煒=1200
趙蘊嵐;何唯平=1039
楊貽方=872
黃金富=871

系統使用情況:

你可能會說——這不是坑爹嗎?要6s的時間消耗!!!

解釋:

1.數據量,統計的數據量在200萬;

如果數據量在幾十萬,測試結果如下:

ad:2006*
213ms
邱則有=1244
張雲波=628
趙蘊嵐;何唯平=398
余內遜;余謙梁=376
楊貽方=298
王爾中=258
汪鐵良=224
趙發=222
黃振華=212
陸舟;於華章=196

  

2.運行在pc機上;

以上解釋也可以理解成借口,那么還有哪些環節可以優化呢?

從cpu和io來看,cpu應該主要是由於hashMap的操作引起的,io主要是由FieldCache.DEFAULT.getStrings(reader, f)獲取統計項引起的。

如果高並發的情況下,io無疑是個大問題,我們可以考慮緩存。

對於運算量大的情況,我們可以考慮分布式。

 

后續我們將分析:

1)二維統計、多維統計

2)個性化統計

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM