NRT原理
When you ask for the IndexReader from the IndexWriter, the IndexWriter will be flushed (docs accumulated in RAM will be written to disk) but not committed (fsync files, write new segments file, etc).The returned IndexReader will search over previously committed segments, as well as the new, flushed but not committed segment. Because flushing will likely be processor rather than IO bound, this should be a process that can be attacked with more processor power if found to be too slow.
Also, deletes are carried in RAM, rather than flushed to disk, which may help in eeking a bit more speed. The result is that you can add and remove documents from a Lucene index in ‘near’ real time by continuously asking for a new Reader from the IndexWriter every second or couple seconds. I haven’t seen a non synthetic test yet, but it looks like its been tested at around 50 documents updates per second without heavy slowdown (eg the results are visible every second).
The patch takes advantage of LUCENE-1483, which keys FieldCaches and Filters at the individual segment level rather than at the index level – this allows you to only reload caches per segment rather then per index – essential for real-time search with filter/cache use.
從中我們可以知道:
1.indexWriter就算不提交,但通過indexWriter獲取的indexReader對新加文檔可見;
API:
public static IndexReader open(IndexWriter writer, boolean applyAllDeletes) throws CorruptIndexException, IOException
-
Open a near real time IndexReader from the
IndexWriter
. -
-
- Parameters:
-
writer
- The IndexWriter to open from -
applyAllDeletes
- If true, all buffered deletes will be applied (made visible) in the returned reader. If false, the deletes are not applied but remain buffered (in IndexWriter) so that they will be applied in the future. Applying deletes can be costly, so if your app can tolerate deleted documents being returned you might gain some performance by passing false. - Returns:
- The new IndexReader
2.通過indexWriter獲取indexReader會是IndexWriter執行flush操作;
源碼:
IndexReader getReader(int termInfosIndexDivisor, boolean applyAllDeletes) throws IOException { ensureOpen(); final long tStart = System.currentTimeMillis(); if (infoStream != null) { message("flush at getReader"); } // Do this up front before flushing so that the readers // obtained during this flush are pooled, the first time // this method is called: poolReaders = true; // Prevent segmentInfos from changing while opening the // reader; in theory we could do similar retry logic, // just like we do when loading segments_N IndexReader r; synchronized(this) { flush(false, applyAllDeletes); r = new ReadOnlyDirectoryReader(this, segmentInfos, termInfosIndexDivisor, applyAllDeletes); if (infoStream != null) { message("return reader version=" + r.getVersion() + " reader=" + r); } } maybeMerge(); if (infoStream != null) { message("getReader took " + (System.currentTimeMillis() - tStart) + " msec"); } return r; }
3.雖然indexReader對新加文檔可見,但是此時新加的文檔並沒有commit到磁盤,因此如果發生意外導致程序非正常結束,那么未commit的數據將會丟失。
以下是一段實驗性代碼,主要目的是粗略查看NRT的效率問題。
package com.fox.nrt; import java.io.File; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Index; import org.apache.lucene.document.Field.Store; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; /** * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc */ public class OpenIfChangedTest { IndexWriter w; public OpenIfChangedTest() { Directory d; try { d = FSDirectory.open(new File("d:/test")); Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_36); IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36, analyzer); w = new IndexWriter(d, conf); r = IndexReader.open(w, true); } catch (IOException e) { e.printStackTrace(); } } IndexReader r; public void commit() { try { w.commit(); w.forceMerge(2); } catch (CorruptIndexException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void search() { try { long bt = System.currentTimeMillis(); IndexReader reader = IndexReader.openIfChanged(r); if (reader == null) reader = this.r; IndexSearcher searcher = new IndexSearcher(reader); Query query = new TermQuery(new Term("f", "a")); TopDocs topdocs = searcher.search(query, 10); // ScoreDoc[] docs = topdocs.scoreDocs; // for (ScoreDoc doc : docs) { // System.out.println(reader.document(doc.doc)); // } long et = System.currentTimeMillis(); System.out.println(topdocs.totalHits + ":" + (et - bt) + "ms"); } catch (IOException e) { e.printStackTrace(); } } public void addDoc() { Document doc = new Document(); doc.add(new Field("f", "a", Store.YES, Index.NOT_ANALYZED)); doc.add(new Field("f1", "a", Store.YES, Index.NOT_ANALYZED)); doc.add(new Field("f2", "a", Store.YES, Index.NOT_ANALYZED)); try { w.addDocument(doc); } catch (CorruptIndexException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * @param args */ public static void main(String[] args) { OpenIfChangedTest oic = new OpenIfChangedTest(); for (int i = 0; i < 1; i++) { Thread wt = new Thread(new WThread(oic)); wt.start(); } for (int i = 0; i < 10; i++) { Thread rt = new Thread(new RThread(oic)); rt.start(); } // Thread ct = new Thread(new CommitThread(oic)); ct.setDaemon(true); ct.start(); } } /** * * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc 添加文檔的線程 */ class WThread implements Runnable { OpenIfChangedTest oic; public WThread(OpenIfChangedTest oic) { super(); this.oic = oic; } @Override public void run() { while (true) { oic.addDoc(); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc 檢索線程 */ class RThread implements Runnable { OpenIfChangedTest oic; public RThread(OpenIfChangedTest oic) { super(); this.oic = oic; } @Override public void run() { while (true){ oic.search(); try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc 提交線程 */ class CommitThread implements Runnable { OpenIfChangedTest oic; public CommitThread(OpenIfChangedTest oic) { super(); this.oic = oic; } @Override public void run() { while (true) { oic.commit(); System.out.println("commit"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } }
通過調整添加文檔線程、檢索文檔線程的數量,以及添加文檔的頻率、檢索文檔的頻率,可以模擬讀寫操作的比例;
上面的代碼中,當寫操作比較頻繁時,寫操作會受到嚴重的影響。這個問題我們先留在這里!
下面我們來看看lucene中NRT的“標准”實現。代碼如下:
package com.fox.nrt; import java.io.File; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Index; import org.apache.lucene.document.Field.Store; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.NRTManager; import org.apache.lucene.search.NRTManagerReopenThread; import org.apache.lucene.search.Query; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; /** * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc */ public class NRTTest { IndexWriter w; NRTManager nrtM; NRTManager.TrackingIndexWriter tw; IndexReader r; NRTManagerReopenThread nmrT = null; public NRTTest() { Directory d; try { d = FSDirectory.open(new File("d:/test1")); Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_36); IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36, analyzer); w = new IndexWriter(d, conf); SearcherFactory searcherFactory = new SearcherFactory(); tw = new NRTManager.TrackingIndexWriter(w); nrtM = new NRTManager(tw, searcherFactory, true); r = IndexReader.open(w, true); // nmrT = new NRTManagerReopenThread(nrtM, 0.50, 0.05); nmrT.setName("nrt reopen thread"); nmrT.setDaemon(true); nmrT.start(); } catch (IOException e) { e.printStackTrace(); } } public void commit() { try { w.commit(); // w.forceMerge(2); } catch (CorruptIndexException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void search() { IndexSearcher searcher = null; try { long bt = System.currentTimeMillis(); searcher = nrtM.acquire(); Query query = new TermQuery(new Term("f", "a")); TopDocs topdocs = searcher.search(query, 10); // ScoreDoc[] docs = topdocs.scoreDocs; // for (ScoreDoc doc : docs) { // System.out.println(reader.document(doc.doc)); // } long et = System.currentTimeMillis(); System.out.println(topdocs.totalHits + ":" + (et - bt) + "ms"); } catch (IOException e) { e.printStackTrace(); } finally { try { nrtM.release(searcher); } catch (IOException e) { e.printStackTrace(); } } } public void addDoc() { Document doc = new Document(); doc.add(new Field("f", "a", Store.YES, Index.NOT_ANALYZED)); doc.add(new Field("f1", "a", Store.YES, Index.NOT_ANALYZED)); doc.add(new Field("f2", "a", Store.YES, Index.NOT_ANALYZED)); try { tw.addDocument(doc); } catch (CorruptIndexException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * @param args */ public static void main(String[] args) { NRTTest nrt = new NRTTest(); for (int i = 0; i < 1; i++) { Thread wt = new Thread(new NRTWThread(nrt)); wt.start(); } for (int i = 0; i < 10; i++) { Thread rt = new Thread(new NRTRThread(nrt)); rt.start(); } // Thread ct = new Thread(new NRTCommitThread(nrt)); ct.setDaemon(true); ct.start(); } } /** * * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc 添加文檔的線程 */ class NRTWThread implements Runnable { NRTTest nrt; public NRTWThread(NRTTest nrt) { super(); this.nrt = nrt; } @Override public void run() { while (true) { nrt.addDoc(); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc 檢索線程 */ class NRTRThread implements Runnable { NRTTest nrt; public NRTRThread(NRTTest nrt) { super(); this.nrt = nrt; } @Override public void run() { while (true) { nrt.search(); try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * * @author huangfox * @data 2012-8-21 * @email huangfox009@126.com * @desc 提交線程 */ class NRTCommitThread implements Runnable { NRTTest nrt; public NRTCommitThread(NRTTest nrt) { super(); this.nrt = nrt; } @Override public void run() { while (true) { nrt.commit(); System.out.println("commit"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } }
然而同樣的讀寫壓力,這段代碼就不會出現檢索效率問題。
我們先來分析下這段代碼中的重點:
1.NRTManager
NRTManager繼承了ReferenceManager,以下內容摘自api。
Utility class to manage sharing near-real-time searchers across multiple searching thread. The difference vs SearcherManager is that this class enables individual requests to wait until specific indexing changes are visible.
You must create an IndexWriter, then create a NRTManager.TrackingIndexWriter
from it, and pass that to the NRTManager. You may want to create two NRTManagers, once that always applies deletes on refresh and one that does not. In this case you should use a single NRTManager.TrackingIndexWriter
instance for both.
Then, use ReferenceManager.acquire()
to obtain the IndexSearcher
, and ReferenceManager.release(G)
(ideally, from within a finally
clause) to release it.
NOTE: to use this class, you must call ReferenceManager.maybeRefresh()
periodically. The NRTManagerReopenThread
is a simple class to do this on a periodic basis, and reopens more quickly if a request is waiting. If you implement your own reopener, be sure to calladdWaitingListener(org.apache.lucene.search.NRTManager.WaitingListener)
so your reopener is notified when a caller is waiting for a specific generation searcher.
2.NRTManagerReopenThread
Utility class that runs a reopen thread to periodically reopen the NRT searchers in the provided NRTManager
.
通過NRTManager周期性的重新打開indexSearcher。
NRTManagerReopenThread主要是周期調用
manager.maybeRefresh();
3.NRTManager的acquire和release
Obtain the current reference. You must match every call to acquire with one call to release(G)
; it's best to do so in a finally clause, and set the reference to null
to prevent accidental usage after it has been released.
You must call this, periodically, if you want that acquire()
will return refreshed instances.
Threads: it's fine for more than one thread to call this at once. Only the first thread will attempt the refresh; subsequent threads will see that another thread is already handling refresh and will return immediately. Note that this means if another thread is already refreshing then subsequent threads will return right away without waiting for the refresh to complete.
If this method returns true it means the calling thread either refreshed or that there were no changes to refresh. If it returns false it means another thread is currently refreshing.
4.nrt通過什么判斷是否需要“刷新”?
infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()
簡單測試:
條件:
5條寫線程,每條線程寫入一個doc后休息100毫秒;
50條讀線程,每條線程檢索一次后休息10毫秒;
一條后台提交線程,3秒提交一次。
測試結果如下:
25565:0ms 25565:1ms 25565:3ms 25565:1ms 25565:20ms 25565:0ms 25565:0ms 25565:22ms 25565:2ms 25565:82ms 25565:26ms 25565:50ms 25565:0ms 25565:29ms 25565:1ms 25565:31ms 25565:37ms 25565:34ms 25565:0ms 25565:0ms 25565:2ms 25565:41ms 25565:42ms 25565:0ms 25565:56ms 25565:44ms
冒號前面是數據總條數,后面是檢索時間。
我們加大文檔數量再測試一次:
170378:5ms 170378:184ms 170378:8ms 170378:8ms 170378:84ms 170378:6ms 170378:5ms 170378:192ms 170378:6ms 170378:190ms 170378:189ms 170378:5ms 170378:140ms 170378:179ms 170378:6ms 170378:7ms 170378:152ms 170378:157ms 170378:190ms 170378:176ms 170378:159ms 170378:183ms 170378:12ms 170378:18ms 170378:5ms 170378:6ms
看來效率還是有些問題,不過我是在pc機(32位)上簡單測試。
但是不管機器再怎么好,數量上去了,並發大了,單機還是很難搞定的。因此分布式孕育而生!