Lucene 的索引文件鎖原理
環境
Lucene 6.0.0
Java “1.8.0_111”
OS Windows 7 Ultimate
線程安全
在Lucene中,打開一個IndexWrite之后,就會自動在索引目錄中生成write.lock文件,這個文件中並不會有內容,不管是在索引打開期間還是在索引關閉之后,其大小都為0KB,並且在IndexWriter關閉之后,並不會刪除該文件。如果同時打開多個IndexWriter的話,后打開的IndexWriter就會拋出LockObtainFailedException
異常。這是個很重要的保護機制,因為若針對同一索引打開兩個writer的話,會導致索引損壞。所以Lucene中的鎖主要針對並發寫的情況,在寫的過程中並不會影響到並發讀操作。
總結如下:
- 任意數量的IndexReader類都可以同時打開一個索引,與其是否同屬於一個JVM無關,在單個JVM中,利用資源和發揮效率的最好辦法是多線程共享單個IndexReader實例。
- 一旦建立起IndexWriter對象,系統即會分配一個鎖給它,該鎖只有當IndexWriter對象被關閉時才會釋放。
- 對於一個索引來說,一次只能打開一個IndexWriter對象,但是對IndexReader並無限制,IndexReader對象甚至可以在IndexWriter對象正在修改索引時打開。每個IndexReader對象將向索引展示自己被打開的時間點。該對象只有在IndexWriter對象提交修改或自己被重新打開后才能獲知索引修改情況。
- 任意多個線程都可以共享同一個IndexReader和IndexWriter,這些類不僅是線程安全的而且是線程有好的,說其是線程有好的意思是,它們能夠很好的擴展到新增線程中,因為這些類中的同步代碼數並不多。
利用遠程文件系統訪問索引
一般地,我們總是希望可以實現分布式搜索,在這方面現成的有Elasticsearch和Solr可以選用。而如果基於Lucene進行開發,如何實現呢?最簡單的方案是用一台專用計算機保存和修改本地索引,然后用其他計算機通過遠程文件訪問來搜索該索引。該方案性能會比搜索本機索引要差得多。
一個改進的策略是通過將遠程文件系統掛載至各計算機會提高一點效果。最好的效果是將索引復制到各台計算機自己的文件系統,然后再進行搜索。一個注意點是在將本地文件系統同步到遠程文件系統的時候,假設遠程文件系統正在被搜索,此時不能立即刪除正在被使用的索引文件,此問題可以通過利用索引文件的代數解決,即搜索端用上一代索引,同步系統同步這一代索引並保留上一代索引至下一次同步時刪除。
另一點是如果在遠程文件系統上存在定時打開的索引線程怎么辦呢?為了獲取最新的索引狀態,通常會由一個后台線程去定時打開索引切換IndexReader,那么此時如果將本地文件系統索引同步到遠程文件系統的時候一樣有問題,比如segments_*
文件同步過去了,定時線程打開segments_*
並開始去查找其它文件,但是其它文件還未同步過去。可以通過先同步其它索引文件(非segments_*
文件),再同步segments_*
文件,然后更改索引代數,最后再去刪除除本次和上次之外的所有無效索引文件解決。
索引鎖機制
首先拋出幾個問題再來詳解。Lucene是如何知道一份索引已被其它IndexWriter打開呢?也許你會說通過write.lock文件,但是該文件里並未存有任何信息,並在IndexWriter關閉之后也不會刪除該文件,如何通過write.lock文件來辨別呢?另外在程序異常退出或是JVM異常關閉都會導致鎖被釋放,再次啟動程序和JVM之后,程序都可以正常地獲取鎖,這又是如何實現的呢?
在Lucene中,提供了一個頂層的抽象類LockFactory,它有三個實現類和一個子抽象類FSLockFactory,而FSLockFactory有兩個實現類,分別是SimpleFSLockFactory和NativeFSLockFactory,總結如下
鎖類名 | 描述 |
---|---|
SingleInstanceLockFactory | RAMDirectory的默認鎖策略,為單個進程內實例准備,意味着所有的加鎖操作都通過這個實例 |
VerifyingLockFactory | 用來包裝其它的LockFactory,以驗證每次獲取鎖/釋放鎖的操作是正確的 |
NoLockFactory | 完全關閉鎖機制,單例模式,你需要使用INSTANCE |
SimpleFSLockFactory | 使用Java的File.createNewFile API,它的主要缺點是在JVM崩潰時會導致遺留一個write.lock文件,在下次使用IndexWriter之前必須手動清除,比較適合NFS使用 |
NativeFSLockFactory | 依賴java.nio.*進行加鎖,FSDirectory的默認鎖,不適合NFS使用,該類的最大好處是JVM異常退出的話,由OS負責移除write.lock,OS並不真的刪除該文件,但是會釋放該文件上的所有引用,確保下次可以重新獲取鎖 |
如果選擇自行實現鎖機制的話,要確認該機制能正確運行。有一個簡單的調試工具LockStressTest,該類可以與LockVerifyServer和VerifyingLockFactory聯合使用,以確認你自己實現的鎖機制能正常運行。
值的說明的是,所有的這些類中都提供了一個獲取鎖的方法,以及一個與之對應的靜態內部類XXXLock,在XXXLock中提供了檢查鎖是否有效的方法ensureValid方法和close方法。close方法主要用來關閉一些流以及釋放與之相關的資源。
加鎖源碼解析
在第一次實例化IndexWriter的過程中,Lucene首先會進行加鎖操作,如果加鎖失敗的話,是不會進行下一步操作的。一個常見的實例化代碼如下
1
2
|
IndexWriter indexWriter = new IndexWriter(FSDirectory.open(Paths.get(indexPath)), new IndexWriterConfig(new
WhitespaceAnalyzer()));
|
打斷點開始逐步調試,程序首先進入FSDirectory的open方法
1
2
3
|
public static FSDirectory open(Path path) throws IOException {
return open(path, FSLockFactory.getDefault());
}
|
在open中調用FSLockFactory的getDefault方法
1
2
3
|
public static final FSLockFactory getDefault() {
return NativeFSLockFactory.INSTANCE;
}
|
由源碼也可以知道,FSDirectory使用NativeFSLockFactory作為其默認鎖策略,接着判斷如果是64位的JRE並且平台支持取消映射文件,則直接返回MMapDirectory
1
2
3
4
5
6
7
8
9
|
public static FSDirectory open(Path path, LockFactory lockFactory) throws IOException {
if (Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return new MMapDirectory(path, lockFactory);
} else if (Constants.WINDOWS) {
return new SimpleFSDirectory(path, lockFactory);
} else {
return new NIOFSDirectory(path, lockFactory);
}
}
|
接着會在IndexWriterConfig的父類LiveIndexWriterConfig中進行一系列的默認初始化操作,包括
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer;
//默認在內存中緩存16.0M大小的文件
ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;
//默認禁用緩存文檔
maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
//禁用禁用緩存刪除的Terms
maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
mergedSegmentWarmer = null;
//索引刪除策略,默認只保留最后一次提交的索引,在最新的提交成功后,刪除所有之前的提交
delPolicy = new KeepOnlyLastCommitDeletionPolicy();
commit = null;
//默認使用復合索引文件,主要是提升性能考慮
useCompoundFile = IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM;
//索引默認打開策略,存在則追加,不存在則創建
openMode = OpenMode.CREATE_OR_APPEND;
//默認的相似度計算,使用BM25Similarity
similarity = IndexSearcher.getDefaultSimilarity();
//並發的合並調度器,可以指定一次最多運行的線程個數和同時合並的最大數目,如果同時合並的數目超過了線程數,那么大的合並將暫停直至小的合並完成
mergeScheduler = new ConcurrentMergeScheduler();
//使用默認的索引鏈
indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
//使用Lucene60作為默認的編解碼器,主要用來對反向索引段進行編碼/解碼
codec = Codec.getDefault();
if (codec == null) {
throw new NullPointerException();
}
//調試用,用於IndexWriter和SegmentInfos等
infoStream = InfoStream.getDefault();
//分層合並策略
mergePolicy = new TieredMergePolicy();
//刷新策略,基於Ram或者數量
flushPolicy = new FlushByRamOrCountsPolicy();
//默認禁用reader池
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
//用來控制如何將線程分配給DocumentsWriterPerThread
indexerThreadPool = new DocumentsWriterPerThreadPool();
//設置單個段的RAM使用的硬上限,之后將強制刷新段
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
|
接着會在IndexWriter的構造函數中進行獲取鎖的操作
1
2
3
4
|
// obtain the write.lock. If the user configured a timeout,
// we wrap with a sleeper and this might take some time.
// 忽略上面注釋,obtain(long lockWaitTimeout)已經移除
writeLock = d.obtainLock(WRITE_LOCK_NAME);
|
這里面的d在64位的JRE中就是MMapDirectory的實例,該類從BaseDirectory繼承得到obtainLock方法
1
2
3
4
5
6
|
/** Holds the LockFactory instance (implements locking for this Directory instance). */
protected final LockFactory lockFactory;
@Override
public final Lock obtainLock(String name) throws IOException {
return lockFactory.obtainLock(this, name);
}
|
真正的獲取鎖的操作是在LockFactory的子類中,上面已經介紹過各個子類,所以此處的lockFactory是NativeFSLockFactory的實例,而lockFactory的引用是LockFactory類,這就是常說的父類引用指向子類實例。在LockFactory中,obtainLock是一個抽象方法,在FSLockFactory中得到實現,同時FSLockFactory是NativeFSLockFactory的父類,所以接着調用FSLockFactory中的obtainLock方法
1
2
3
4
5
6
7
|
@Override
public final Lock obtainLock(Directory dir, String lockName) throws IOException {
if (!(dir instanceof FSDirectory)) {
throw new UnsupportedOperationException(getClass().getSimpleName() + " can only be used with FSDirectory subclasses, got: " + dir);
}
return obtainFSLock((FSDirectory) dir, lockName);
}
|
在obtainLock中調用obtainFSLock方法,這也是個抽象方法,由NativeFSLockFactory實現,所以這時候才真的跳轉到了獲取鎖的核心實現方法里
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
@Override
protected Lock obtainFSLock(FSDirectory dir, String lockName) throws IOException {
Path lockDir = dir.getDirectory();
// Ensure that lockDir exists and is a directory.
// note: this will fail if lockDir is a symlink
Files.createDirectories(lockDir);
Path lockFile = lockDir.resolve(lockName);
try {
Files.createFile(lockFile);
} catch (IOException ignore) {
// we must create the file to have a truly canonical path.
// if it's already created, we don't care. if it cant be created, it will fail below.
}
// fails if the lock file does not exist
final Path realPath = lockFile.toRealPath();
// used as a best-effort check, to see if the underlying file has changed
final FileTime creationTime = Files.readAttributes(realPath, BasicFileAttributes.class).creationTime();
if (LOCK_HELD.add(realPath.toString())) {
FileChannel channel = null;
FileLock lock = null;
try {
channel = FileChannel.open(realPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
lock = channel.tryLock();
if (lock != null) {
return new NativeFSLock(lock, channel, realPath, creationTime);
} else {
throw new LockObtainFailedException("Lock held by another program: " + realPath);
}
} finally {
if (lock == null) { // not successful - clear up and move out
IOUtils.closeWhileHandlingException(channel); // TODO: addSuppressed
clearLockHeld(realPath); // clear LOCK_HELD last
}
}
} else {
throw new LockObtainFailedException("Lock held by this virtual machine: " + realPath);
}
}
|
首先會遞歸的創建多層目錄,然后創建write.lock文件,如果已經存在拋java.nio.file.FileAlreadyExistsException異常,捕獲該異常之后不做任何處理,因為一旦索引創建之后,write.lock文件會一直存在,即使IndexWriter已經關閉,所以不能以該文件是否存在來判斷是否有多個IndexWriter被打開了,那么是根據什么來判斷的呢?
接着會獲取該文件創建時候的時間戳,並且將該文件的真實路徑加入LOCK_HELD,LOCK_HELD的聲明如下,是一個同步的HashSet集合,在第一次打開IndexWriter的時候LOCK_HELD.add(realPath.toString())
成功,然后調用FileChannel的API去獲取鎖;當繼續希望打開另外的IndexWriter的時候必然LOCK_HELD.add(realPath.toString())
失敗,拋LockObtainFailedException異常。同時也解釋了如果JVM崩潰,LOCK_HELD在內存中必然也失效,相當於是自動解鎖了,不影響下次的重新加鎖操作。
1
|
private static final Set LOCK_HELD = Collections.synchronizedSet(new HashSet());
|
釋放鎖源碼解析
釋放鎖是在調用indexWriter.close()
之后,默認在關閉時提交,所以會進入shutdown方法
1
2
3
4
5
6
7
8
|
@Override
public void close() throws IOException {
if (config.getCommitOnClose()) {
shutdown();
} else {
rollback();
}
}
|
在shutdown中,Lucene會先判斷一系列預先設置的參數,然后進行刷新操作,將所有在內存中緩存的更新刷新到Directory中,然后靜靜等待合並結束,合並之后會進行內部的提交操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
private void shutdown() throws IOException {
if (pendingCommit != null) {
throw new IllegalStateException("cannot close: prepareCommit was already called with no corresponding call to commit");
}
// Ensure that only one thread actually gets to do the
// closing
if (shouldClose(true)) {
boolean success = false;
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now flush at close");
}
flush(true, true);
waitForMerges();
commitInternal(config.getMergePolicy());
rollbackInternal(); // ie close, since we just committed
success = true;
} finally {
if (success == false) {
// Be certain to close the index on any exception
try {
rollbackInternal();
} catch (Throwable t) {
// Suppress so we keep throwing original exception
}
}
}
}
}
|
在commitInternal(config.getMergePolicy())
方法中會調用finishCommit方法,該方法會更新索引文件的代數,同時會創建段文件信息的備份,接着調用rollbackInternal()
方法,該方法主要是確保沒有提交操作在運行,這樣即使有其它線程在同步文件到硬盤中,依然可以關閉。
1
2
3
4
5
6
|
private void rollbackInternal() throws IOException {
// Make sure no commit is running, else e.g. we can close while another thread is still fsync'ing:
synchronized(commitLock) {
rollbackInternalNoCommit();
}
}
|
在rollbackInternalNoCommit
方法中,進行釋放鎖的操作IOUtils.close(writeLock)
,改方法會將所有需要關閉的資源轉換為一個List鏈表
1
2
3
|
public static void close(Closeable... objects) throws IOException {
close(Arrays.asList(objects));
}
|
然后逐個關閉該鏈表中持有的資源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public static void close(Iterable extends Closeable> objects) throws IOException {
Throwable th = null;
for (Closeable object : objects) {
try {
if (object != null) {
object.close();
}
} catch (Throwable t) {
addSuppressed(th, t);
if (th == null) {
th = t;
}
}
}
reThrow(th);
}
|
這里的object.close()
關閉操作調用的就是NativeFSLock中的close方法,因為object其實就是NativeFSLock的實例,由NativeFSLock的父類Lock持有引用。NativeFSLock中的close方法如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
// NOTE: we don't validate, as unlike SimpleFSLockFactory, we can't break others locks
// first release the lock, then the channel
try (FileChannel channel = this.channel;
FileLock lock = this.lock) {
assert lock != null;
assert channel != null;
} finally {
closed = true;
clearLockHeld(path);
}
}
|
因為NativeFSLock是NativeFSLockFactory的靜態內部類,在finally中會直接調用NativeFSLockFactory的clearLockHeld方法,該方法代碼如下
1
2
3
4
5
6
|
private static final void clearLockHeld(Path path) throws IOException {
boolean remove = LOCK_HELD.remove(path.toString());
if (remove == false) {
throw new AlreadyClosedException("Lock path was cleared but never marked as held: " + path);
}
}
|
至此又看到了熟悉的身影,LOCK_HELD一個同步的HashSet集合會移除write.lock文件的真實路徑,釋放鎖完畢,結束。