Solr4.8.0源碼分析(17)之SolrCloud索引深入(4)


Solr4.8.0源碼分析(17)之SolrCloud索引深入(4)

      前面幾節以add為例已經介紹了solrcloud索引鏈建索引的三步過程,delete以及deletebyquery跟add過程大同小異,這里暫時就不介紹了。由於commit流程較為特殊,那么本節主要簡要介紹下commit的流程。

1. SolrCloud的commit流程

     SolrCloud的commit流程同樣分為三步,本節主要簡單介紹下三步過程。

1.1 LogUpdateProcessor

     LogUpdateProcessor的commit比較簡單,主要包含兩個步驟,調用DistributedUpdateProcessor的commit以及將commit信息寫入日志。

1   public void processCommit( CommitUpdateCommand cmd ) throws IOException {
2     if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString() + " " + req); }
3     if (next != null) next.processCommit(cmd);
4 
5 
6     final String msg = cmd.optimize ? "optimize" : "commit";
7     toLog.add(msg, "");
8   }

1.2 DistributedUpdateProcessor

    DistributedUpdateProcessor的commit過程較前者稍微復雜點,主要有一個判斷,如果本節點滿足以下幾點之一,不是集群,只有一個node且是leader,是被轉發過來的,就會進行dolocalcommit,否則的就會進行commit請求的轉發。其中dolocalcommit會調用DirectUpdateHandler2的commit。

 1   @Override
 2   public void processCommit(CommitUpdateCommand cmd) throws IOException {
 3     updateCommand = cmd;
 4     List<Node> nodes = null;
 5     boolean singleLeader = false;
 6     if (zkEnabled) {
 7       zkCheck();
 8       
 9       nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
10           .getCloudDescriptor().getCollectionName());
11       if (isLeader && nodes.size() == 1) {
12         singleLeader = true;
13       }
14     }
15     
16     if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
17       doLocalCommit(cmd);
18     } else if (zkEnabled) {
19       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
20       if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
21         params.set(COMMIT_END_POINT, true);
22         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
23         params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
24             zkController.getBaseUrl(), req.getCore().getName()));
25         if (nodes != null) {
26           cmdDistrib.distribCommit(cmd, nodes, params);
27           finish();
28         }
29       }
30     }
31   }
 1   private void doLocalCommit(CommitUpdateCommand cmd) throws IOException {
 2     if (vinfo != null) {
 3       vinfo.lockForUpdate();
 4     }
 5     try {
 6 
 7       if (ulog == null || ulog.getState() == UpdateLog.State.ACTIVE || (cmd.getFlags() & UpdateCommand.REPLAY) != 0) {
 8         super.processCommit(cmd);
 9       } else {
10         log.info("Ignoring commit while not ACTIVE - state: " + ulog.getState() + " replay:" + (cmd.getFlags() & UpdateCommand.REPLAY));
11       }
12 
13     } finally {
14       if (vinfo != null) {
15         vinfo.unlockForUpdate();
16       }
17     }
18   }

1.3 DirectUpdateHandler2

     現在才是commit最關鍵的流程,DirectUpdateHandler2的commit流程。本步驟的commit包含了對softcommit和hardcommit的處理。

  • commit過程包含prepareCommit,Commit,以及postCommit,我們主要關注的是commit
  • 當進行commit時,會首先取消等待的softcommit和hardcommit。因為commit的效果是對整個solr的,所以多個commit只會影響性能而不會影響效果。
  • 其次solr還會判斷是否需要進行索引優化,即optimize。optimize的本質是合並策略中的forcemerge,forcemerge比較暴力,它不管你的合並策略是怎么限制segemnt的大小以及個數,它會一股腦的把所有的segment擠成一個,所以他是很費性能的。關於forcemerge的具體內容將在后續的介紹merge中展開。如果不需要優化optimize,Solr會進行forceMergeDeletes來刪除已標記刪除的document,它相當於一個小型的forcemerge,對性能的影響較少。當然,forcemerge也會對標記刪除的document進行真正的刪除。
  • Solr存在一種情況,沒有進行commit但是索引發生變化了,Solr會進行檢查這種情況,如果發生了就會進行一次commit。
  • 如果Solr進行的softcommit,首先會對ulog進行一次commit操作,將ulog進行一次清理。同時會調用getSearcher()來重新打開一個SolrIndexSearch滿足實時性的要求。SolrIndexSearch是本節的重點,將在第2節重點介紹。
  • 如果Solr進行的是hardcommit,那么Solr會刪除ulog中最舊的日志(前文中講到的addOldLog),生成新的日志文件TransactionLog編號。Solr會根據是否需要打開Searcher來調用getSearcher還是openNewSearcher。
  • 最后waitSearcher[0].get()會等待新的Searcher打開。
  • 以上就是DirectUpdateHandler2 commit的主要步驟,重點是在getSearcher和openNewSearcher上,下一節將重點介紹。
  1  public void commit(CommitUpdateCommand cmd) throws IOException {
  2     if (cmd.prepareCommit) {
  3       prepareCommit(cmd);
  4       return;
  5     }
  6 
  7     if (cmd.optimize) {
  8       optimizeCommands.incrementAndGet();
  9     } else {
 10       commitCommands.incrementAndGet();
 11       if (cmd.expungeDeletes) expungeDeleteCommands.incrementAndGet();
 12     }
 13 
 14     Future[] waitSearcher = null;
 15     if (cmd.waitSearcher) {
 16       waitSearcher = new Future[1];
 17     }
 18 
 19     boolean error=true;
 20     try {
 21       // only allow one hard commit to proceed at once
 22       if (!cmd.softCommit) {
 23         solrCoreState.getCommitLock().lock();
 24       }
 25 
 26       log.info("start "+cmd);
 27 
 28       // We must cancel pending commits *before* we actually execute the commit.
 29 
 30       if (cmd.openSearcher) {
 31         // we can cancel any pending soft commits if this commit will open a new searcher
 32         softCommitTracker.cancelPendingCommit();
 33       }
 34       if (!cmd.softCommit && (cmd.openSearcher || !commitTracker.getOpenSearcher())) {
 35         // cancel a pending hard commit if this commit is of equal or greater "strength"...
 36         // If the autoCommit has openSearcher=true, then this commit must have openSearcher=true
 37         // to cancel.
 38          commitTracker.cancelPendingCommit();
 39       }
 40 
 41       RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
 42       try {
 43         IndexWriter writer = iw.get();
 44         if (cmd.optimize) {
 45           writer.forceMerge(cmd.maxOptimizeSegments);
 46         } else if (cmd.expungeDeletes) {
 47           writer.forceMergeDeletes();
 48         }
 49         
 50         if (!cmd.softCommit) {
 51           synchronized (solrCoreState.getUpdateLock()) { // sync is currently needed to prevent preCommit
 52                                 // from being called between preSoft and
 53                                 // postSoft... see postSoft comments.
 54             if (ulog != null) ulog.preCommit(cmd);
 55           }
 56           
 57           // SolrCore.verbose("writer.commit() start writer=",writer);
 58 
 59           if (writer.hasUncommittedChanges()) {
 60             final Map<String,String> commitData = new HashMap<>();
 61             commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
 62                 String.valueOf(System.currentTimeMillis()));
 63             writer.setCommitData(commitData);
 64             writer.commit();
 65           } else {
 66             log.info("No uncommitted changes. Skipping IW.commit.");
 67           }
 68 
 69           // SolrCore.verbose("writer.commit() end");
 70           numDocsPending.set(0);
 71           callPostCommitCallbacks();
 72         } else {
 73           callPostSoftCommitCallbacks();
 74         }
 75       } finally {
 76         iw.decref();
 77       }
 78 
 79 
 80       if (cmd.optimize) {
 81         callPostOptimizeCallbacks();
 82       }
 83 
 84 
 85       if (cmd.softCommit) {
 86         // ulog.preSoftCommit();
 87         synchronized (solrCoreState.getUpdateLock()) {
 88           if (ulog != null) ulog.preSoftCommit(cmd);
 89           core.getSearcher(true, false, waitSearcher, true);
 90           if (ulog != null) ulog.postSoftCommit(cmd);
 91         }
 92         // ulog.postSoftCommit();
 93       } else {
 94         synchronized (solrCoreState.getUpdateLock()) {
 95           if (ulog != null) ulog.preSoftCommit(cmd);
 96           if (cmd.openSearcher) {
 97             core.getSearcher(true, false, waitSearcher);
 98           } else {
 99             // force open a new realtime searcher so realtime-get and versioning code can see the latest
100             RefCounted<SolrIndexSearcher> searchHolder = core.openNewSearcher(true, true);
101             searchHolder.decref();
102           }
103           if (ulog != null) ulog.postSoftCommit(cmd);
104         }
105         if (ulog != null) ulog.postCommit(cmd); // postCommit currently means new searcher has
106                               // also been opened
107       }
108 
109       // reset commit tracking
110 
111       if (cmd.softCommit) {
112         softCommitTracker.didCommit();
113       } else {
114         commitTracker.didCommit();
115       }
116       
117       log.info("end_commit_flush");
118 
119       error=false;
120     }
121     finally {
122       if (!cmd.softCommit) {
123         solrCoreState.getCommitLock().unlock();
124       }
125 
126       addCommands.set(0);
127       deleteByIdCommands.set(0);
128       deleteByQueryCommands.set(0);
129       if (error) numErrors.incrementAndGet();
130     }
131 
132     // if we are supposed to wait for the searcher to be registered, then we should do it
133     // outside any synchronized block so that other update operations can proceed.
134     if (waitSearcher!=null && waitSearcher[0] != null) {
135        try {
136         waitSearcher[0].get();
137       } catch (InterruptedException e) {
138         SolrException.log(log,e);
139       } catch (ExecutionException e) {
140         SolrException.log(log,e);
141       }
142     }
143   }

2. getSearcher

      getSearcher 獲取一個現有的SolrIndexSearcher或者創建新的SolrIndexSearcher。每當進行SoftCommit的時候,重新創建一個新的SolrIndexSearcher是實現近實時索引的基礎。在重新打開SolrIndexSearcher的時候,Solr不但會進行預熱(warn),而且還會新建SolrEventListener。

1 public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher, boolean updateHandlerReopens) {
2 }

       getSearcher主要包含以下幾個參數:

  • 如果已有IndexSearcher打開,是否需要強制打開新的IndexSearcher。如果設置為ture,那么每次都會打開新的IndexSearcher,那么剛add的document就是可查訊的,近實時查詢需要該值為true。
  • waitSearcher 如果該值不為空,那么Solr會得到新的Searcher注冊后才會返回新的Searcher。如果在重新打開Searcher的過程中需要進行預熱(關於預熱下節重點介紹),那么這個waitSearcher就會等到預熱完成才返回,而預熱的過程往往會占用大量的時間,比較影響索引的性能。
  • returnSearcher 如果設置為ture,則返回SolrIndexSearcher,並引用加1.

     接下來通過源碼,來學習下Solr是如何獲取到一個Seacher。

需要補充幾點:

  • onDeckSearchers 表示正在准備新建的Searcher。該值在SolrConfig.xml可以進行配置,該值很大程度上制約了多線程建索引的線程數。如果同時用10個線程在建索引,且commit比較頻繁,而maxWarmingSearchers設置為8,那么很容出現以下這種錯誤:

1 Error opening new searcher. exceeded limit of maxWarmingSearchers

        而且當多個線程建索引的時候,且commit比較頻繁,一直會有warm:

1 PERFORMANCE WARNING: Overlapping onDeckSearchers=2
1 <maxWarmingSearchers>10</maxWarmingSearchers>
 1       if (onDeckSearchers < 1) {
 2         // should never happen... just a sanity check
 3         log.error(logid+"ERROR!!! onDeckSearchers is " + onDeckSearchers);
 4         onDeckSearchers=1;  // reset
 5       } else if (onDeckSearchers > maxWarmingSearchers) {
 6         onDeckSearchers--;
 7         String msg="Error opening new searcher. exceeded limit of maxWarmingSearchers="+maxWarmingSearchers + ", try again later.";
 8         log.warn(logid+""+ msg);
 9         // HTTP 503==service unavailable, or 409==Conflict
10         throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,msg);
11       } else if (onDeckSearchers > 1) {
12         log.warn(logid+"PERFORMANCE WARNING: Overlapping onDeckSearchers=" + onDeckSearchers);
13       }
  • 預熱即是提升查詢性能的一種方式,但是它是以消耗索引性能的,具體的介紹將會在下一節Solr的緩存機制中詳細介紹。
  • 在getSearcher時候,Solr同樣會對一些listener進行預熱.在solrconfig.xml上可以配置在newSearcher和firstSearcher的監聽器,在事件觸發時,可以做某些熱身搜索,讓Searcher做好准備提供服務,特別是服務重啟的時候,如果沒有做好熱身,開始提供服務搜索時都很勉強。但是通過配置的方式進行listener的預熱只對固定的一些查詢進行,對於查詢比較自由的環境效果可能並不明顯。
 1 <listener event="newSearcher" class="solr.QuerySenderListener">
 2       <arr name="queries">
 3           <lst><str name="q">美女</str><str name="qt">standard</str><str name="sort">rtsTime desc</str></lst>
 4            <lst><str name="q">hadoop</str><str name="qt">standard</str><str name="sort">rtsTime desc</str></lst>
 5            <lst><str name="q">zoie</str><str name="qt">standard</str><str name="sort">rts desc</str></lst>
 6            <lst><str name="q">lucene</str><str name="qt">standard</str><str name="sort">pubdate desc</str></lst>        
 7      <lst><str name="q">new searcher</str><str name="qt">standard</str><str name="sort">sourceId desc</str></lst>        
 8            <lst><str name="q">solr</str><str name="qt">standard</str><str name="sort">price desc</str></lst>               
 9       </arr>
10     </listener>
  • 相比於預熱,Solr還提供了另外一種打開Searcher方式即cold Searcher,該方式會直接注冊Searcher,並不需要進行預熱,因此它會非常迅速,但是由於打開的是完成干凈的Searcher,所以一點緩存信息也沒有,比較影響一開始的查詢性能。
1  <useColdSearcher>false</useColdSearcher>
  • 最后講下注冊,注冊其實是將新建的Searcher寫到一個map結構的變量中private final Map<String, SolrInfoMBean> infoRegistry的過程
 1   public void register() {
 2     // register self
 3     core.getInfoRegistry().put("searcher", this);
 4     core.getInfoRegistry().put(name, this);
 5     for (SolrCache cache : cacheList) {
 6       cache.setState(SolrCache.State.LIVE);
 7       core.getInfoRegistry().put(cache.name(), cache);
 8     }
 9     registerTime=System.currentTimeMillis();
10   }
  • 最后再簡單介紹下openNewSearcher,顧名思義該函數就是重新打開新的Searcher。主要代碼如下,本質上就是new一個SolrIndexSearcher,只不過會根據是否是近實時模式(nrtmode),是否已有打開的Searcher(判斷是否是啟動時候打開的searcher),以及是否需要快速打開Searcher(若快速打開Searcher則過濾掉預熱過程,在前文中講到DirectUpdateHandler2的commit過程中也調用了RefCounted<SolrIndexSearcher> searchHolder = core.openNewSearcher(true, true);這里由於設置了true表示需要快速打開,所以是cold模式的searcher。)
1 tmp = new SolrIndexSearcher(this, newIndexDir, getLatestSchema(), getSolrConfig().indexConfig, 
2               (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);

 

總結:

      本節學習了SolrCloud的commit三步過程,重點介紹了DirectUpdateHandler2的commit和getSearcher的過程,篇幅有限並未深入學習Lucene的commit原理。同時本節提到了Warn預熱的內容,那么下節開始將學習下SolrCloud的緩存機制。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM