Solr In Action 筆記(4) 之 SolrCloud分布式索引基礎


Solr In Action 筆記(4) 之 SolrCloud Index 基礎

     SolrCloud Index流程研究了兩天,還是沒有完全搞懂,先簡單記下基礎的知識,過幾天再寫個深入點的。先補充上前文來不及寫的內容。

1. Solr.xml的重要配置

     Solr.xml的內容如下:

 1 <solr>
 2   <solrcloud>
 3     <str name="host">${host:}</str>
 4     <int name="hostPort">${jetty.port:8983}</int>
 5     <str name="hostContext">${hostContext:solr}</str>
 6     <int name="zkClientTimeout">${zkClientTimeout:15000}</int>
 7     <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
 8   </solrcloud>
 9   <shardHandlerFactory name="shardHandlerFactory"
10     class="HttpShardHandlerFactory">
11     <int name="socketTimeout">${socketTimeout:0}</int>
12     <int name="connTimeout">${connTimeout:0}</int>
13   </shardHandlerFactory>
14 </solr>
  • host , host 指的是Solr節點的IP地址,當Solr節點上線時候,它會向Zookeeper進行注冊,注冊信息如IP地址就會存儲在/clusterstate.json中。這里不但可以直接使用host IP地址如192.168.1.0,也可以使用機器的hostname比如bigdata01。
  • port , port 指的時Solr用來監聽的端口,默認是8983,同樣它會存儲在/clusterstate.json中。
  • Solr Host Context, 指的是Solr.war部署的環境路徑,多數情況下不用修改。
  • zookeeper client timeout,上一節講到過,zookeeper Znode節點變化最大反應時間。
  • core node name, 該節點控制Solr core的命名策略,如果genericCoreNodeNames為true,那么Solr會給core取普通的名字比如,core_node1 ;如果設為true,則會給core取容易辨別的名字,比如帶上host信息,比如10.0.1.7:8983_solr_logmill
  • Leader Vote Wait Period:

     該參數並未直接在solr.xml中列出來,SolrCloud的leader和其他replica下線只剩最后一個replica的時候,這個Replica並不會立馬選舉leader,他會等待一段時間,查看leader是否上線,如果上線了,那么leader仍然還是leader,replica仍然還是replica,如果在這個時間段外leader沒有上線,那么replica就變為leader了。這個時間就是Leader Vote Wait Period,它的存在防止了當leader和其他replica下線時候,具有舊的數據的node選為leader。

     比如以下一個例子,一個shard有兩個node,X為leader,Y為replica,如果X在線,Y下線,那么X仍然可以接受update請求,SolrCloud仍然繼續正常運行,只不過leader X不需要再把數據分發給Y了,Y上線后X只需要簡單將數據同步給Y就行(Peer sync 策略)。如果X下線,Y在線,那么這個時候因為沒有leader接受update請求以及沒有leader轉發數據,Y是不會接收到update請求的,所以這個時候的SolrCloud的所以建立是無法進行的,所以一旦X掛了SolrCloud就會進行leader選舉,但是我們不能立馬讓Y變為leader,因為Y的數據相比較X來說是舊的數據。如果Y選舉為Leader了,那么后續的update他就會接受,過段時間X上線了,由於Y已經是leader了所以X只能是replica,數據的流向變成了Y轉發到X,這個時候就發現了奇怪的現象就是X中有部分數據新於Y(Y當選為leader前的數據),Y中有部分數據也新於X(Y當選為leader后的數據),這個時候就需要啟動Snapshot replication 策略進行數據復原了,比較麻煩。如果設置了leaderVoteWait 那么X下線后,Y會等待leaderVoteWait時間,這個時間內update操作都是失敗的,如果在這時間內X上線了,那么X立馬恢復leader狀態繼續工作,否則就會Y就會變成leader。

     要改善這種情況,可以增加shard和replica的數量,較少leader和replica同時掛掉的可能性。

  • zkHost,同樣沒有出現在上面的solr.xml上,它可以在solr.xml的zkHost配置中設置zookeepr集群信息比如192.168.0.1:2181,192.168.0.2:2181表示兩個zookeeper組成一個zookeeper集群。

2. SolrCloud的分布式建索引

2.1 Document的Hash

      建好的SolrCloud集群每一個shard都會有一個Hash區間,當Document進行update的時候,SolrCloud就會計算這個Document的Hash值,然后根據該值和shard的hash區間來判斷這個document應該發往哪個shard,所以首先讓我們先來學習下SolrCloud的hash算法。Solr使用document route組件來進行document的分發。目前Solr有兩個DocRouter類的子類CompositeIdRouter(Solr默認采用的)類和ImplicitDocRouter類,當然我們也可以通過繼承DocRouter來定制化我們的document route組件。

     之前我們學習過,當Solr Shard建立時候,Solr會給每一個shard分配32bit的hash值的區間,比如SolrCloud有兩個shard分別為A,B,那么A的hash值區間就為 80000000-ffffffff ,B的hash值區間為0-7fffffff  。默認的CompositeIdRouter hash策略會根據document ID計算出唯一的Hash值,並判斷該值在那個shard的hash區間內。

     SolrCloud對於Hash值的獲取提出了以下幾個要求:

  • hash計算速度必須快,因為hash計算是分布式建索引的第一步,SolrCloud不可能在這一不上花很多時間。
  • hash值必須能均勻的分布於每一個shard,如果有一個shard的document數量大於另一個shard,那么在查詢的時候前一個shard所花的時間就會大於后一個,SolrCloud的查詢是先分后匯總的過程,也就是說最后每一個shard查詢完畢才算完畢,所以SolrCloud的查詢速度是由最慢的shard的查詢速度決定的。我們有理由讓SolrCloud做好充分的負載均衡。

     基於以上兩點,SolrCloud采用了MurmurHash 算法,那么讓我們先來看下該算法的代碼,說實話這個代碼我真沒看懂,等下次獨立寫個章節學習下MurmurHash算法吧。

  1 /** Returns the MurmurHash3_x86_32 hash of the UTF-8 bytes of the String without actually encoding
  2    * the string to a temporary buffer.  This is more than 2x faster than hashing the result
  3    * of String.getBytes().
  4    */
  5   public static int murmurhash3_x86_32(CharSequence data, int offset, int len, int seed) {
  6 
  7     final int c1 = 0xcc9e2d51;
  8     final int c2 = 0x1b873593;
  9 
 10     int h1 = seed;
 11 
 12     int pos = offset;
 13     int end = offset + len;
 14     int k1 = 0;
 15     int k2 = 0;
 16     int shift = 0;
 17     int bits = 0;
 18     int nBytes = 0;   // length in UTF8 bytes
 19 
 20 
 21     while (pos < end) {
 22       int code = data.charAt(pos++);
 23       if (code < 0x80) {
 24         k2 = code;
 25         bits = 8;
 26 
 27         /***
 28          // optimized ascii implementation (currently slower!!! code size?)
 29          if (shift == 24) {
 30          k1 = k1 | (code << 24);
 31 
 32          k1 *= c1;
 33          k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
 34          k1 *= c2;
 35 
 36          h1 ^= k1;
 37          h1 = (h1 << 13) | (h1 >>> 19);  // ROTL32(h1,13);
 38          h1 = h1*5+0xe6546b64;
 39 
 40          shift = 0;
 41          nBytes += 4;
 42          k1 = 0;
 43          } else {
 44          k1 |= code << shift;
 45          shift += 8;
 46          }
 47          continue;
 48          ***/
 49 
 50       }
 51       else if (code < 0x800) {
 52         k2 = (0xC0 | (code >> 6))
 53             | ((0x80 | (code & 0x3F)) << 8);
 54         bits = 16;
 55       }
 56       else if (code < 0xD800 || code > 0xDFFF || pos>=end) {
 57         // we check for pos>=end to encode an unpaired surrogate as 3 bytes.
 58         k2 = (0xE0 | (code >> 12))
 59             | ((0x80 | ((code >> 6) & 0x3F)) << 8)
 60             | ((0x80 | (code & 0x3F)) << 16);
 61         bits = 24;
 62       } else {
 63         // surrogate pair
 64         // int utf32 = pos < end ? (int) data.charAt(pos++) : 0;
 65         int utf32 = (int) data.charAt(pos++);
 66         utf32 = ((code - 0xD7C0) << 10) + (utf32 & 0x3FF);
 67         k2 = (0xff & (0xF0 | (utf32 >> 18)))
 68             | ((0x80 | ((utf32 >> 12) & 0x3F))) << 8
 69             | ((0x80 | ((utf32 >> 6) & 0x3F))) << 16
 70             |  (0x80 | (utf32 & 0x3F)) << 24;
 71         bits = 32;
 72       }
 73 
 74 
 75       k1 |= k2 << shift;
 76 
 77       // int used_bits = 32 - shift;  // how many bits of k2 were used in k1.
 78       // int unused_bits = bits - used_bits; //  (bits-(32-shift)) == bits+shift-32  == bits-newshift
 79 
 80       shift += bits;
 81       if (shift >= 32) {
 82         // mix after we have a complete word
 83 
 84         k1 *= c1;
 85         k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
 86         k1 *= c2;
 87 
 88         h1 ^= k1;
 89         h1 = (h1 << 13) | (h1 >>> 19);  // ROTL32(h1,13);
 90         h1 = h1*5+0xe6546b64;
 91 
 92         shift -= 32;
 93         // unfortunately, java won't let you shift 32 bits off, so we need to check for 0
 94         if (shift != 0) {
 95           k1 = k2 >>> (bits-shift);   // bits used == bits - newshift
 96         } else {
 97           k1 = 0;
 98         }
 99         nBytes += 4;
100       }
101 
102     } // inner
103 
104     // handle tail
105     if (shift > 0) {
106       nBytes += shift >> 3;
107       k1 *= c1;
108       k1 = (k1 << 15) | (k1 >>> 17);  // ROTL32(k1,15);
109       k1 *= c2;
110       h1 ^= k1;
111     }
112 
113     // finalization
114     h1 ^= nBytes;
115 
116     // fmix(h1);
117     h1 ^= h1 >>> 16;
118     h1 *= 0x85ebca6b;
119     h1 ^= h1 >>> 13;
120     h1 *= 0xc2b2ae35;
121     h1 ^= h1 >>> 16;
122 
123     return h1;
124   }

      最后我們再簡單地學習下hash計算的源碼吧:

  • SolrCloud 利用CompositeIdRouter.sliceHash來計算document的hash
 1   public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCollection collection) {
 2     String shardFieldName = getRouteField(collection);
 3     if (shardFieldName != null && doc != null) {
 4       Object o = doc.getFieldValue(shardFieldName);
 5       if (o == null)
 6         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No value for :" + shardFieldName + ". Unable to identify shard");
 7       id = o.toString();
 8     }
 9     if (id.indexOf(SEPARATOR) < 0) {
10       return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
11     }
12 
13     return new KeyParser(id).getHash();
14   }
  • 根據計算出來的hash值計算應該將document發往哪些節點
 1 public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
 2     if (shardKey == null) {
 3       // search across whole collection
 4       // TODO: this may need modification in the future when shard splitting could cause an overlap
 5       return collection.getActiveSlices();
 6     }
 7     String id = shardKey;
 8 
 9     if (shardKey.indexOf(SEPARATOR) < 0) {
10       // shardKey is a simple id, so don't do a range
11       return Collections.singletonList(hashToSlice(Hash.murmurhash3_x86_32(id, 0, id.length(), 0), collection));
12     }
13 
14     Range completeRange = new KeyParser(id).getRange();
15 
16     List<Slice> targetSlices = new ArrayList<>(1);
17     for (Slice slice : collection.getActiveSlices()) {
18       Range range = slice.getRange();
19       if (range != null && range.overlaps(completeRange)) {
20         targetSlices.add(slice);
21       }
22     }
23 
24     return targetSlices;
25   }
  • 最后,看下SolrCloud是怎么划分shard的hash值區間的。以下代碼需要注意幾點,
    • boolean round = rangeStep >= (1 << bits) * 16  判斷shard個數是否小於4096個,如果round為true,肯定小於4096個,也就是每個shard的區間長度大於 (1 << bits) * 16

    • int mask = 0x0000ffffend & mask != mask 表示判斷shard個數不是2的指數次,如果shard個數是2的指數次那么shard的區間肯定是mask的整數倍,也就是說end & mask后最后的16位全為1即0xffff

    • end - roundDown < roundUp - end ;當shard個數不是2的指數次時,end離哪個邊界近就設置為哪個邊界(這里的邊界是0x0000ffff的整數倍)。

    • 從上面得知,shard的區間得滿足0x0000ffff的整數倍
 1 public List<Range> partitionRange(int partitions, Range range) {
 2     int min = range.min;
 3     int max = range.max;
 4 
 5     assert max >= min;
 6     if (partitions == 0) return Collections.EMPTY_LIST;
 7     long rangeSize = (long) max - (long) min;
 8     long rangeStep = Math.max(1, rangeSize / partitions);
 9 
10     List<Range> ranges = new ArrayList<>(partitions);
11 
12     long start = min;
13     long end = start;
14 
15     // keep track of the idealized target to avoid accumulating rounding errors
16     long targetStart = min;
17     long targetEnd = targetStart;
18 
19     // Round to avoid splitting hash domains across ranges if such rounding is not significant.
20     // With default bits==16, one would need to create more than 4000 shards before this
21     // becomes false by default.
22     int mask = 0x0000ffff;
23     boolean round = rangeStep >= (1 << bits) * 16;
24 
25     while (end < max) {
26       targetEnd = targetStart + rangeStep;
27       end = targetEnd;
28 
29       if (round && ((end & mask) != mask)) {
30         // round up or down?
31         int increment = 1 << bits;  // 0x00010000
32         long roundDown = (end | mask) - increment;
33         long roundUp = (end | mask) + increment;
34         if (end - roundDown < roundUp - end && roundDown > start) {
35           end = roundDown;
36         } else {
37           end = roundUp;
38         }
39       }
40 
41       // make last range always end exactly on MAX_VALUE
42       if (ranges.size() == partitions - 1) {
43         end = max;
44       }
45       ranges.add(new Range((int) start, (int) end));
46       start = end + 1L;
47       targetStart = targetEnd + 1L;
48     }
49 
50     return ranges;
51   }

2.2 ADD Document過程

        SolrCloud進行update/add document的過程是采用的索引鏈的方式,暫時我還沒看懂,所以本節先不講代碼,大致學習原理以及過程,下節再開一章講述下Add document的過程。整個過程我們從Solrj客戶端講起。

  • 當SolrJ 發送update請求給CloudSolrServer ,CloudSolrServer會連接至Zookeeper獲取當前SolrCloud的集群狀態,並會在/clusterstate.json 和/live_nodes 注冊watcher,便於監視Zookeeper和SolrCloud,這樣做的好處有以下幾點:
    • CloudSolrServer獲取到SolrCloud的狀態后,它能跟直接將document發往SolrCloud的leader,降低網絡轉發消耗。
    • 注冊watcher有利於建索引時候的負載均衡,比如如果有個節點leader下線了,那么CloudSolrServer會立馬得知,那它就會停止往下線leader發送document。
  • 路由document至正確的shard。CloudSolrServer 在發送document時候需要知道發往哪個shard,這就是上一小節2.1講過的內容,但是這里需要注意,單個document的路由非常簡單,但是SolrCloud支持批量add,也就是說正常情況下N個document同時進行路由。這個時候SolrCloud就會根據document路由的去向分開存放document即進行分類,然后進行並發發送至相應的shard,這就需要較高的並發能力。
  • Leader接受到update請求后,先將update信息存放到本地的update log,同時Leader還會給documrnt分配新的version,對於已存在的document,Leader就會驗證分配的新version與已有的version,如果新的版本高就會拋棄舊版本,最后發送至replica。
  • 一旦document經過驗證以及加入version后,就會並行的被轉發至所有上線的replica。SolrCloud並不會關注那些已經下線的replica,因為當他們上線時候會有recovery進程對他們進行恢復。如果轉發的replica處於recovering狀態,那么這個replica就會把update放入update transaction 日志。
  • 當leader接受到所有的replica的反饋成功后,它才會反饋客戶端成功。只要shard中又一個replica是active的,Solr就會繼續接受update請求。這一策略其實是犧牲了一致性換取了寫入的有效性。之前我們講到leaderVoteWait參數,它表示當只有一個replica時候,這個replica會進入recovering狀態並持續一段時間等待leader的重新上線。那么如果在這段時間內leader沒有上線,那么他就會轉成leader會有一些document丟失。(這里我有點不明白,既然leader掛了,難道update 請求還會發送成功?如果成功是發往哪的?) 當然后續會有方法來避免這個情況,比如使用majority quorum 策略,跟Zookeeper的leader選舉策略一樣,比如當多數的replica下線了,那么客戶端的write就會失敗。
  • 最后的步驟就是commit了,commit有兩種,一種是softcommit,即在內存中生成segment,document是可見的(可查詢到)但是沒有寫入磁盤,斷電后數據丟失。另一種是hardcommit,直接將數據寫入磁盤且數據可見。前一種消耗較少,后一種消耗較大。

   

    大致講了下SolrCloud的index流程,不是很細致。最后總結以下幾點:

  • leader轉發的規則
    • 請求來自leader轉發:FROMLEADER,那么就只需要寫到本地ulog,不需要轉發給leader,也不需要轉發給其它replicas。如果replica處於非actibe狀態中,就會講update請求接受並寫入ulog,但不會寫入索引。如果發現重復的更新就會丟棄舊版本的更新。
    • 請求不是來自leader,但自己就是leader,那么就需要將請求寫到本地,順便分發給其他的replicas.
    • 請求不是來自leader,但自己又不是leader,也就是該更新請求是最原始的更新請求,那么需要將請求寫到本地ulog,順便轉發給leader,再由leader分發
    • 每commit一次,就會重新生成一個ulog更新日志,當服務器掛掉,內存數據丟失,就可以從ulog中恢復
  • 建索引時候最好使用CloudSolrServer,直接向leader發送update請求避免網絡開銷
  • 批量add document時候,建議在客戶端提前做好document的路由,在SolrCloud內進行document開銷較大

 

 

2.3 NRT 近實時搜索

     SolrCloud支持近實時搜索,所謂的近實時搜索即在較短的時間內使得add的document可見可查,這主要基於softcommit機制(Lucene是沒有softcommit的,只有hardcommit)。

     當進行SoftCommit時候,Solr會打開新的Searcher從而使得新的document可見,同時Solr還會進行預熱緩存以及查詢以使得緩存的數據也是可見的。所以必須保證預熱緩存以及預熱查詢的執行時間必須短於commit的頻率,否則就會由於打開太多的searcher而造成commit失敗。

     最后說說在工作中近實時搜索的感受吧,近實時搜索是相對的,對於有些客戶1分鍾就是近實時了,有些3分鍾就是近實時了。而對於Solr來說,softcommit越頻繁實時性更高,而softcommit越頻繁則Solr的負荷越大(commit越頻率越會生成小且多的segment,於是merge出現的更頻繁)。目前我們公司的softcommit頻率是3分鍾,之前設置過1分鍾而使得Solr在Index所占資源過多大大影響了查詢。所以近實時蠻困擾着我們的,因為客戶會不停的要求你更加實時,目前公司采用加入緩存機制來彌補這個實時性。

 

2.4 Node recovery process 

    Node recovery process 是SolrCloud容災能力的重要體現,也是我最近研究的重點之一,目前還沒得及深入研究,所以一樣先看下概念吧。

    SolrCloud支持兩種recovery策略,Peer sync 和 Snapshot replication ,分別對應類PeerSync和Snapshot ,他們兩者是根據節點下線時丟失的update 請求的數量進行區分的。

  • Peer sync, 如果中斷的時間較短,recovering node只是丟失少量update請求,那么它可以從leader的update log中獲取。這個臨界值是100個update請求,如果大於100,就會從leader進行完整的索引快照恢復。
  • Snapshot replication, 如果節點下線太久以至於不能從leader那進行同步,它就會使用solr的基於http進行索引的快照恢復
  • 當你加入新的replica到shard中,它就會進行一個完整的index Snapshot。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM