Solr4.8.0源碼分析(16)之SolrCloud索引深入(3)


Solr4.8.0源碼分析(16)之SolrCloud索引深入(3) 

      前面兩節學習了SolrCloud索引過程以及索引鏈的前兩步,LogUpdateProcessorFactory和DistributedUpdateProcessor。本節將詳細介紹了索引鏈的第三步DirectUpdateHandler2和UpdateLog。

 1. DirectUpdateHandler2.ADD

      DirectUpdateHandler2過程包含了Solr到Lucene的索引過程,在整個索引鏈中是最復雜也最重要的過程。首先,我們來查看在Solrconfig.xml中關於DirectUpdateHandler2的配置。

 1   40   <updateHandler class="solr.DirectUpdateHandler2">
 2   41 
 3   42 
 4   43      <autoCommit>
 5   44          <maxTime>${solr.autoCommit.maxTime:15000}</maxTime>
 6   45          <maxDocs>${solr.autoCommit.maxDocs:25000}</maxDocs>
 7   46        <openSearcher>false</openSearcher> 
 8   47      </autoCommit>
 9   48 
10   49      <autoSoftCommit> 
11   50          <maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime> 
12   51          <maxDocs>${solr.autoSoftCommit.maxDocs:1000}</maxDocs>
13   52      </autoSoftCommit>
14   53   
15   54   </updateHandler>

從上面中可以看出幾個主要的參數:autoCommit和autoSoftCommit

  • autoCommit,硬提交,Solr和Lucene原本存在的commit方式,負責把索引內容刷入磁盤,需要重新打開searcher,所以比較費性能。刷入磁盤后,Solr/Lucene對這部分內容可見可查。
  • autoSoftCommit,軟提交,這是Solr新增的commit方式,Lucene沒有。軟提交負責將索引內容在內存中生成segment,並使得索引內容對Solr可見可查,該提交方式是autoCommit的改善方式,保證了Solr的實時性同時又兼顧了性能。在進行softcommit過程中需要進行預熱(即將現在狀態的searcher復制到新的searcher中,保證了舊的softcommit數據不丟失),雖然沒有重新打開searcher那么費性能,但是預熱頻率過快還是會影響solr的性能。
  • 以上兩種是Solr自動觸發的commit方式,他們都有兩個參數maxTime和maxDocs分別表示兩個參數的極限,當距離前一次commit maxTime時間后或者內存中的document數量到達maxDocs時候就會觸發commit(autoCommit和autoSoftCommit)。相比於前兩種,還有另外一種方式即客戶端主動commit,該方式由客戶端控制。
  • 最后openSearcher配置表示進行autocommit時候是否重新打開searcher,如果autocommit頻繁又將openSearcher設置為true,那么solr的性能壓力會非常大。一般將autocommit的maxTime和maxDocs設的相對大點,對應的softcommit的設置小點,這樣即保證了性能又保證了實時性,當然具體的值需要根據索引的頻率以及document的大小綜合考慮。

      前面簡要介紹了autoCommit和autoSoftCommit,這部分內容網上較多,本文就不具體介紹了。接下來着重介紹DirectUpdateHandler2的流程。

      上一節講到DirectUpdateHandler2是在DistributedUpdateProcessor過程中的versionadd中進行調用。以add過程為例,RunUpdateProcessorFactory.processAdd()

 1   public void processAdd(AddUpdateCommand cmd) throws IOException {
 2     
 3     if (DistributedUpdateProcessor.isAtomicUpdate(cmd)) {
 4       throw new SolrException
 5         (SolrException.ErrorCode.BAD_REQUEST,
 6          "RunUpdateProcessor has recieved an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain");
 7     }
 8 
 9     updateHandler.addDoc(cmd);
10     super.processAdd(cmd);
11     changesSinceCommit = true;
12   }

       接着來查看下addDoc0(),該函數包括了DirectUpdateHandler2的add全過程。代碼邏輯比較簡單,只需要注意以下幾點即可:

  • cmd.overwrite,是否會覆蓋原先記錄。如果傳入的cmd中沒有unique_id域,那么說明Solr索引中沒有采用自定義的unique_id,因此就不會進行覆蓋相同unique_id域的記錄了。可以在schema.xml中進行設置unique_id域,如果設了該域,一旦新記錄的該域值與舊的記錄相同,它就會刪除舊的記錄。經過本人測試,沒有unique_id的建索引速度是有unique_id的兩到三倍,但是沒有unique_id時候需要考慮數據的冗余性,查詢時有可能會出現多條相同結果。
  • deletesAfter = ulog.getDBQNewer(cmd.version);獲取ulog中delete by query的日志,並對這些數據進行刪除。

  • add的先后順序是先進行writer.updateDocument()將數據寫入Lucene的索引中,后將記錄寫入uLog中(ulog.add(cmd))。這樣更好的保證了數據一致性。
  • 關於writer.updateDocument()由於涉及到Lucene的索引建立過程了,在后面單獨進行學習。
  1 private int addDoc0(AddUpdateCommand cmd) throws IOException {
  2     int rc = -1;
  3     RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
  4     try {
  5       IndexWriter writer = iw.get();
  6       addCommands.incrementAndGet();
  7       addCommandsCumulative.incrementAndGet();
  8       
  9       // if there is no ID field, don't overwrite
 10       if (idField == null) {
 11         cmd.overwrite = false;
 12       }
 13       
 14       try {
 15         IndexSchema schema = cmd.getReq().getSchema();
 16         
 17         if (cmd.overwrite) {
 18           
 19           // Check for delete by query commands newer (i.e. reordered). This
 20           // should always be null on a leader
 21           List<UpdateLog.DBQ> deletesAfter = null;
 22           if (ulog != null && cmd.version > 0) {
 23             deletesAfter = ulog.getDBQNewer(cmd.version);
 24           }
 25           
 26           if (deletesAfter != null) {
 27             log.info("Reordered DBQs detected.  Update=" + cmd + " DBQs="
 28                 + deletesAfter);
 29             List<Query> dbqList = new ArrayList<>(deletesAfter.size());
 30             for (UpdateLog.DBQ dbq : deletesAfter) {
 31               try {
 32                 DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
 33                 tmpDel.query = dbq.q;
 34                 tmpDel.version = -dbq.version;
 35                 dbqList.add(getQuery(tmpDel));
 36               } catch (Exception e) {
 37                 log.error("Exception parsing reordered query : " + dbq, e);
 38               }
 39             }
 40             
 41             addAndDelete(cmd, dbqList);
 42           } else {
 43             // normal update
 44             
 45             Term updateTerm;
 46             Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId());
 47             boolean del = false;
 48             if (cmd.updateTerm == null) {
 49               updateTerm = idTerm;
 50             } else {
 51               // this is only used by the dedup update processor
 52               del = true;
 53               updateTerm = cmd.updateTerm;
 54             }
 55 
 56             if (cmd.isBlock()) {
 57               writer.updateDocuments(updateTerm, cmd, schema.getAnalyzer());
 58             } else {
 59               Document luceneDocument = cmd.getLuceneDocument();
 60               // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
 61               writer.updateDocument(updateTerm, luceneDocument, schema.getAnalyzer());
 62             }
 63             // SolrCore.verbose("updateDocument",updateTerm,"DONE");
 64             
 65             if (del) { // ensure id remains unique
 66               BooleanQuery bq = new BooleanQuery();
 67               bq.add(new BooleanClause(new TermQuery(updateTerm),
 68                   Occur.MUST_NOT));
 69               bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
 70               writer.deleteDocuments(bq);
 71             }
 72             
 73             // Add to the transaction log *after* successfully adding to the
 74             // index, if there was no error.
 75             // This ordering ensures that if we log it, it's definitely been
 76             // added to the the index.
 77             // This also ensures that if a commit sneaks in-between, that we
 78             // know everything in a particular
 79             // log version was definitely committed.
 80             if (ulog != null) ulog.add(cmd);
 81           }
 82           
 83         } else {
 84           // allow duplicates
 85           if (cmd.isBlock()) {
 86             writer.addDocuments(cmd, schema.getAnalyzer());
 87           } else {
 88             writer.addDocument(cmd.getLuceneDocument(), schema.getAnalyzer());
 89           }
 90 
 91           if (ulog != null) ulog.add(cmd);
 92         }
 93         
 94         if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
 95           if (commitWithinSoftCommit) {
 96             commitTracker.addedDocument(-1);
 97             softCommitTracker.addedDocument(cmd.commitWithin);
 98           } else {
 99             softCommitTracker.addedDocument(-1);
100             commitTracker.addedDocument(cmd.commitWithin);
101           }
102         }
103         
104         rc = 1;
105       } finally {
106         if (rc != 1) {
107           numErrors.incrementAndGet();
108           numErrorsCumulative.incrementAndGet();
109         } else {
110           numDocsPending.incrementAndGet();
111         }
112       }
113       
114     } finally {
115       iw.decref();
116     }
117     
118     return rc;
119   }

 2. UpdateLog.ADD

      UpdateLog的add也比較簡單,主要分為三步:

  • 檢查update log有沒有生成。同樣需要說明,Updatelog是Solr的概念,在Lucene並沒有出現,它在solrconfig.xml中進行配置,設置索引庫更新日志,默認路徑為solr home下面的data/tlog。如果沒有ulog文件,那么就會重新生成一個.
1 <updateLog>
2  <str name="dir">${solr.ulog.dir:}</str>
3  </updateLog>
  • 開始寫入ulog日志文件中,pos = tlog.write(cmd, operationFlags);該過程調用了TransactionLog的write接口,這在下一小節具體介紹。
  • 將update的內容再寫入map結構中,存放於內存。

3. TransactionLog

    咋一看會覺得DirectUpdateHandler2的add過程比較簡單,但是當add與commit以及updatelog recovering合並在一起,這個過程就變得比較復雜。本節先介紹updatelog的最小單位transactionLog.

  • TransactionLog是一個tlog文件,UpdateLog是多個tlog文件的集合,它更多的指的時tLog目錄。
  • TransactionLog的文件命名格式如下:列入tlog.00000000000000000001
1 public static String LOG_FILENAME_PATTERN = "%s.%019d";
2 String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id);
  • TransactionLog的文件格式可以通過寫文件的過程查看,注意這里的strings存放的是域,比如titile,author,content,那么后續存放document的值也是按這個順序存放的,具有一一對應的關系。文件結構比較簡單,可以從以下代碼中了解。
 1   protected void writeLogHeader(LogCodec codec) throws IOException {
 2     long pos = fos.size();
 3     assert pos == 0;
 4 
 5     Map header = new LinkedHashMap<String,Object>();
 6     header.put("SOLR_TLOG",1); // a magic string + version number
 7     header.put("strings",globalStringList);
 8     codec.marshal(header, fos);
 9 
10     endRecord(pos);
11   }
 1 public long write(AddUpdateCommand cmd, int flags) {
 2     LogCodec codec = new LogCodec(resolver);
 3     SolrInputDocument sdoc = cmd.getSolrInputDocument();
 4 
 5     try {
 6       //寫header信息
 7       checkWriteHeader(codec, sdoc);
 8 
 9       // adaptive buffer sizing
10       int bufSize = lastAddSize;    // unsynchronized access of lastAddSize should be fine
11       bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
12 
13       MemOutputStream out = new MemOutputStream(new byte[bufSize]);
14       codec.init(out);
15       //寫tag
16       codec.writeTag(JavaBinCodec.ARR, 3);
17       //寫update類型
18       codec.writeInt(UpdateLog.ADD | flags);  // should just take one byte
19       //寫version信息
20       codec.writeLong(cmd.getVersion());
21       //寫document
22       codec.writeSolrInputDocument(cmd.getSolrInputDocument());
23       lastAddSize = (int)out.size();
24 
25       synchronized (this) {
26         long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
27         assert pos != 0;
28 
29         /***
30          System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
31          if (pos != fos.size()) {
32          throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
33          }
34          ***/
35 
36         out.writeAll(fos);
37         endRecord(pos);
38         // fos.flushBuffer();  // flush later
39         return pos;
40       }
41 
42     } catch (IOException e) {
43       // TODO: reset our file pointer back to "pos", the start of this record.
44       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
45     }
46   }
  • TransactionLog的創建是在每次update操作(add,delete或者deletebyquery)開始時,每當接收到update操作時候,Solr會去判斷是否已有當前id的tlog文件,如果沒有則新建新的當前id的tlog文件。
1   protected void ensureLog() {
2     if (tlog == null) {
3       String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id);
4       tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
5     }
6   }
  • Solr如果只進行soft commit,那么TransactionLog文件只會增大不會增多,它只會往最近的(即id最大的)TransactionLog文件中寫入ulog日志。如果進行的是hard commit,則會生成新的TransactionLog文件,並且根據存放的總的日志數(record)以及TransactionLog文件的個數進行判斷是否需要刪除舊的日志文件,默認情況下日志數(record)為100,TransactionLog個數為10個。代碼中numRecordsToKeep為100。但是當我們進行快速建索引的時候,一開始並不會滿足上述的條件,即會存在多個日志數(record)多余100的情況,這是為什么呢?快速建索引的時候,當soft commit一次進去大量record到TransactionLog中,並不會生成新的id的TransactionLog文件,也就不會取處理舊的TransactionLog文件。當soft commit頻率大於hard commit時候,每個TransactionLog文件都會存放大量record,但是hard commit只會刪除最舊的那個文件,剩余的TransactionLog的record數量仍然大於100, 因此這種現象是正常的。當你停止建索引,或者調整hard commit頻率,這種現象會慢慢改變,直至符合正常的范圍。
 1   protected void addOldLog(TransactionLog oldLog, boolean removeOld) {
 2     if (oldLog == null) return;
 3 
 4     numOldRecords += oldLog.numRecords();
 5 
 6     int currRecords = numOldRecords;
 7 
 8     if (oldLog != tlog &&  tlog != null) {
 9       currRecords += tlog.numRecords();
10     }
11 
12     while (removeOld && logs.size() > 0) {
13       TransactionLog log = logs.peekLast();
14       int nrec = log.numRecords();
15       // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
16       // we already have the limit of 10 log files.
17       if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
18         currRecords -= nrec;
19         numOldRecords -= nrec;
20         logs.removeLast().decref();  // dereference so it will be deleted when no longer in use
21         continue;
22       }
23 
24       break;
25     }
26 
27     // don't incref... we are taking ownership from the caller.
28     logs.addFirst(oldLog);
29   }
  • UpateLog會始終保存最新的兩個TransactionLog文件,以及log的信息。每當進行soft commit或者hard commit操作時候進行更新。
1   protected void newMap() {
2     prevMap2 = prevMap;
3     prevMapLog2 = prevMapLog;
4 
5     prevMap = map;
6     prevMapLog = tlog;
7 
8     map = new HashMap<>();
9   }

 

總結:本節主要講了update 索引鏈的第三步DirectUpdateHandler2中的add過程,add過程主要包含了兩步,第一步調用lucene indexwriter 進行updatedocument以及將索引寫入updatelog。lucene indexwriter涉及到lucene的建索引了,將在后續文章中再研究。updatelog的難點主要在recovery上,所以本節又簡要的介紹了updatelog的基本內容以及具體的日志文件TransactionLog。下一節將介紹update的commit操作,它也主要涉及了updatelog的更新操作。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM