es lucene寫入流程,segment產生機制源碼分析


本文主要分析es lucene寫入流程,lucene segment的產生,flush, commit與es的refresh,flush。

1 segment的產生

當索引一個文檔時,如果存在空閑的segment(未被其他線程鎖定),則取出空閑segment list中的最后一個segment(LIFO),並鎖定,將文檔索引至該segment,

找達到flush條件的segment,然后解鎖,歸還至空閑segment list,如果有達到flush條件的segment,flush該segment(同步執行)。

如果不存在,則創建新的segment,重復上述步驟。

總結1:如果並行的執行向一個索引,索引文檔,則需要不同的segment。

相關代碼:

//索引一個文檔。
IndexWriter.updateDocument
//索引一個文檔。
DocumentsWriter.updateDocument
//一個線程索引時鎖定一個ThreadState對象,索引后歸還至free list。
ThreadState
//ThreadState的屬性,一個DocumentsWriterPerThread對應一個segment,flush后,該ThreadState的dwpt為null,
//下次使用該ThreadState,創建新的dwpt,新的segment。
DocumentsWriterPerThread

 

2 flush條件

索引一個文檔后,找出是否有達到flush條件的segment。

1:如果maxBufferedDocs(默認-1,es未設置)不等於-1,且當前segment在內存中的doc數量大於等於maxBufferedDocs,則標記該segment的flushPending。

2:如果不滿足1,且ramBufferSizeMB(默認16.0,es設置為es.index.memory.max_index_buffer_size)不等於-1,當內存中當前IndexWriter所有segment之和(包括deleted docs)大於ramBufferSizeMB時,找出內存中最大的且未標記flushPending的segment,標記該segment的flushPending。

3:如果當前1,2之后,當前segment還未標記flushPending,則當前segment大於perThreadHardLimitMB(默認1945,es未設置),標記該segment的flushPending。

123之后,如果當前segment被標記,則flush當前segment。否則從flushQueue中poll一個segment,如果flushQueue(調用flush時,將所有segment加入queue)為空,則遍歷segment取第一個標記flushPending的segment進行flush。

相關代碼:

//查找符合flush的segment。
DocumentsWriterFlushControl.doAfterDocument
//flush當前segment前,reset當前dwpt,下次使用當前ThreadState需要新的dwpt,新的segment。
DocumentsWriterFlushControl.internalTryCheckOutForFlush
//flush當前segment,或者其他segment。
DocumentsWriter.postUpdate

注意:除了達到flush條件的自動flush,還可以通過調用api flush,如:

1:es refresh

2:es flush

3:es syncedFlush

 

3 flush

flush將內存中的segment寫到文件(在調用線程中同步執行),但不執行fileChannel.force(nio,bio則fileOutputStream.flush),一部分數據可能在buffer中。

相關代碼:

//flush一個segment。
DocumentsWriter.doFlush
DocumentsWriterPerThread.flush
DefaultIndexingChain.flush
//寫nvd, nvm文件。
writeNorms
//寫dvd, dvm文件。
writeDocValues
//針對numberic, 寫dii, dim文件。
writePoints
//寫fdt, fdx文件(該文件在首次indexing時創建,flush時寫入值)。
storedFieldsConsumer.flush
//寫doc, pos, tim, tip文件。
termsHash.flush
//寫fnm文件。
docWriter.codec.fieldInfosFormat().write
//寫cfs, cfe, si, liv(如果有刪除)文件。
DocumentsWriterPerThread.sealFlushedSegment
//刪除cfs, cfe, si, liv(如果有刪除)之外的文件。
IndexWriter.doAfterFlush

 

4 commit

commit執行fileChannel.force,將buffer中的數據寫到磁盤。具體步驟為:

1:flush all segments 將內存中所有的segments寫到文件。

2:依次sync pending_segments_n,segment files(fileChannel.force)將這寫文件同步到磁盤。

3:將pending_segments_n重命名為segments_n,刪除舊的segments_n-1。

4:如果步驟1 flush了segment,執行maybeMerge,如果達到merge條件,將會merge。

相關代碼:

//commit。
IndexWriter.commit
IndexWriter.commitInternal
IndexWriter.prepareCommitInternal
//flush segments。
DocumentsWriter.flushAllThreads
//sync file。
IndexWriter.startCommit
Directory.sync
IOUtils.fsync
FileChannel.force
FileChannelImpl.force
//更新commit信息segments_n,刪除舊的segments_n-1。
IndexWriter.finishCommit
//如果達到merge條件,將會merge。
IndexWriter.maybeMerge

 

5 maybeMerge

flush或者commit后,如果flush了segment,執行maybeMerge,如果達到merge條件,將執行merge(異步執行)。具體步驟為:

1:將segments按size降序排列。

2:計算total segments size 和 minimum segment size。

3:total segments size過濾掉tooBigSegment(大於max_merged_segment/2.0)的segment,並記錄tooBigCount;minSegmentBytes如果小於floor_segment(默認2mb),取2mb。

4:計算allowedSegCountInt,當segments(不包含tooBigSegment)數量大於此數,將觸發merge。

5:從大到小(之前的降序排列),貪心找出不大於maxMergeAtOnce個, 且size總和不大於maxMergedSegmentBytes個segments進行merge。

相關代碼:

//maybeMerge。
IndexWriter.maybeMerge
IndexWriter.updatePendingMerges
//查找可merge的segments。
TieredMergePolicy.findMerges
//執行merge。
ConcurrentMergeScheduler.merge
//控制merge線程數量
ConcurrentMergeScheduler.maybeStall

//用來異步執行merge的線程。

MergeThread

 

6 es refresh

主要執行lucene的flushAllThreads和maybeMerge。refresh的兩個條件:

1:達到refresh_interval設置的時間間隔。

2:節點所有shard的segments占用內存(調用lucene api獲取)之和達到indices.memory.index_buffer_size,找出占用最大的shard執行refresh。

相關代碼:

 //refresh_interval refresh。
 IndexService.AsyncRefreshTask
 //indices.memory.index_buffer_size refresh。
 IndexingMemoryController.runUnlocked
 IndexingMemoryController.writeIndexingBufferAsync

//es refresh。
InternalEngine.refresh
//lucene refresh。
ReferenceManager.maybeRefreshBlocking
DirectoryReader.openIfChanged
StandardDirectoryReader.doOpenIfChanged
IndexWriter.getReader
//flush segments。
DocumentsWriter.flushAllThreads
//如果flush了segment,則執行maybeMerge。
IndexWriter.maybeMerge

 

7 es flush

主要執行步驟為:

1:prepareCommit translog:

1.1 備份 translog.ckp到translog-1.ckp。

1.2 fsync translog-1.ckp以及translog 文件夾。

1.3 創建新的translog數據文件translog-n.tlog,更新translog.ckp(寫入checkPoint)。

2:commit indexWriter(見4 commit)。

3:refresh(見6 es refresh)。

4:commit translog:刪除備份的translog-1.ckp以及舊的translog數據文件translog-n-1.tlog。

相關代碼:

//es flush。
InternalEngine.flush
//prepareCommit translog。
Translog.prepareCommit
//es commit index writer。
InternalEngine.commitIndexWriter
//lucene commit。
IndexWriter.commit
//es refresh。
InternalEngine.refresh
//commit translog。
Translog.commit

總結2:lucene的flush是指將內存中的segment,寫到磁盤但不執行fileChannel.force,一部分數據會在buffer中;commit會調用force,將buffer中的數據寫到磁盤。

es的refresh調用lucene的flush;flush調用lucene的commit。

 

參考:

elasticsearch5.6.12,lucene6.6.1 源碼

https://www.outcoldman.com/en/archive/2017/07/13/elasticsearch-explaining-merge-settings/

http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM