Elasticsearch簡介
Elasticsearch是一個基於Apache lucene的實時分布式搜索。具有以下優點:
1、實時處理大規模數據。2、全文檢索,能夠做到結構化檢索和聚合分析。3、分布式系統。
這些優點形成了以下的應用場景:
1、站內搜索。2、NoSQL Json文檔數據庫,讀寫性能均高於MongoDB。3、搭建日志平台用於統計、監控和分析。
Elasticsearch基本概念
- 節點(Node):物理概念,一個運行的Elasticsearch,一般是位於一台機器上的一個進程。
- 索引(Index):邏輯概念,包括配置信息mapping和倒排索引數據文件,一個索引的數據文件可能會分布於一台機器,也有可能分布於多台機器。
- 分片(Shard):為了支持更大量的數據,索引一般會按某種維度分成多個部分,每個部分就是一個分片,分片被節點(Node)管理。一個節點一般會管理多個分片,這些分片可能是屬於同一份索引,也可能屬於不同的索引,但是為了可靠性和可用性,同一個索引的分片盡量會分布在不同節點(Node)上。分片有兩種,主分片(Primary Shard)和副本分片(Replica Shard)。
- 副本分片(Replica Shard):同一個分片(Shard)的備份數據,一個分片可能會有0個或多個副本,這些副本中的數據保證強一致或最終一致。
分片的分布圖如下:

節點類型
一個Elasticsearch實例是一個節點,一組節點組成了集群。Elasticsearch集群中的節點可以配置為3種不同的角色:
-
主節點:控制Elasticsearch集群,負責集群中的操作,比如創建/刪除一個索引,跟蹤集群中的節點,分配分片到節點。主節點處理集群的狀態並廣播到其他節點,並接收其他節點的確認響應。
每個節點都可以通過設定配置文件elasticsearch.yml中的node.master屬性為true(默認)成為主節點。
對於大型的生產集群來說,推薦使用一個專門的主節點來控制集群,該節點將不處理任何用戶請求。主節點最好只有一個,用來控制和調配集群級的擴展。
-
數據節點(Data Node):持有數據和倒排索引。默認情況下,每個節點都可以通過設定配置文件elasticsearch.yml中的node.data屬性為true(默認)成為數據節點。如果我們要使用一個專門的主節點,應將其node.data屬性設置為false。
-
客戶端節點(Transport Node):如果我們將node.master屬性和node.data屬性都設置為false,那么該節點就是一個客戶端節點,扮演一個負載均衡的角色,將到來的請求路由到集群中的各個節點。
Elasticsearch集群中作為客戶端接入的節點叫協調節點。協調節點會將客戶端請求路由到集群中合適的分片上。對於讀請求來說,協調節點每次會選擇不同的分片處理請求,以實現負載均衡。
節點部署方式

Elasticsearch支持上述兩種部署方式:
第一種:混合部署(如左圖),不考慮MasterNode的情況下,還有兩種Node,Data Node和Transport Node,這種部署模式下,這兩種不同類型Node角色都位於同一個Node中,相當於一個Node具備兩種功能:Data和Transport。
當有index或者query請求的時候,請求隨機(自定義)發送給任何一個Node,這台Node中會持有一個全局的路由表,通過路由表選擇合適的Node,將請求發送給這些Node,然后等所有請求都返回后,合並結果,然后返回給用戶。一個Node分飾兩種角色。
好處就是使用極其簡單,易上手,對推廣系統有很大價值。最簡單的場景下只需要啟動一個Node,就能完成所有的功能。
缺點就是:1、多種類型的請求會相互影響,在大集群如果某一個Data Node出現熱點,那么就會影響途經這個Data Node的所有其他跨Node請求。如果發生故障,故障影響面會變大很多。
2、Elasticsearch中每個Node都需要和其余的每一個Node都保持着連接。這種情況下,每個Node都需要和其他所有Node保持連接,而一個系統的連接數是有上限的,這樣連接數就會限制集群規模。
3、還有就是不能支持集群的熱更新。
第二種:分層部署(如右圖),通過配置可以隔離開Node。設置部分Node為Transport Node,專門用來做請求轉發和結果合並。其他Node可以設置為DataNode,專門用來處理數據。
缺點是上手復雜,需要提前設置好Transport的數量,且數量和Data Node、流量等相關,否則要么資源閑置,要么機器被打爆。
好處就是:1、角色相互獨立,不會相互影響,一般Transport Node的流量是平均分配的,很少出現單台機器的CPU或流量被打滿的情況,而DataNode由於處理數據,很容易出現單機資源被占滿,比如CPU,網絡,磁盤等。獨立開后,DataNode如果出了故障只是影響單節點的數據處理,不會影響其他節點的請求,影響限制在最小的范圍內。
2、角色獨立后,只需要Transport Node連接所有的DataNode,而DataNode則不需要和其他DataNode有連接。一個集群中DataNode的數量遠大於Transport Node,這樣集群的規模可以更大。另外,還可以通過分組,使Transport Node只連接固定分組的DataNode,這樣Elasticsearch的連接數問題就徹底解決了。
3、可以支持熱更新:先一台一台的升級DataNode,升級完成后再升級Transport Node,整個過程中,可以做到讓用戶無感知。
Elasticsearch 數據層架構
數據存儲
Elasticsearch的Index和meta,目前支持存儲在本地文件系統中,同時支持niofs,mmap,simplefs,smb等不同加載方式,性能最好的是直接將索引LOCK進內存的MMap方式。默認,Elasticsearch會自動選擇加載方式,另外可以自己在配置文件中配置。這里有幾個細節,具體可以看官方文檔。
索引和meta數據都存在本地,會帶來一個問題:當某一台機器宕機或者磁盤損壞的時候,數據就丟失了。為了解決這個問題,可以使用Replica(副本)功能。
副本(Replica)
可以為每一個Index設置一個配置項:副本(Replica)數,如果設置副本數為2,那么就會有3個Shard,其中一個是Primary Shard,其余兩個是Replica Shard,這三個Shard會被Master盡量調度到不同機器,甚至機架上,這三個Shard中的數據一樣,提供同樣的服務能力。副本(Replica)的目的有三個:
- 保證服務可用性:當設置了多個Replica的時候,如果某一個Replica不可用的時候,那么請求流量可以繼續發往其他Replica,服務可以很快恢復開始服務。
- 保證數據可靠性:如果只有一個Primary,沒有Replica,那么當Primary的機器磁盤損壞的時候,那么這個Node中所有Shard的數據會丟失,只能reindex了。
- 提供更大的查詢能力:當Shard提供的查詢能力無法滿足業務需求的時候, 可以繼續加N個Replica,這樣查詢能力就能提高N倍,輕松增加系統的並發度。
存儲模型
Elasticsearch使用了Apache Lucene,后者是Doug Cutting(Apache Hadoop之父)使用Java開發的全文檢索工具庫,其內部使用的是被稱為倒排索引的數據結構,其設計是為全文檢索結果的低延遲提供服務。文檔是Elasticsearch的數據單位,對文檔中的詞項進行分詞,並創建去重詞項的有序列表,將詞項與其在文檔中出現的位置列表關聯,便形成了倒排索引。
這和一本書后面的索引非常類似,即書中包含的詞匯與其出現的頁碼列表關聯。當我們說文檔被索引了,我們指的是倒排索引。我們來看下如下2個文檔是如何被倒排索引的:
文檔1(Doc 1): Insight Data Engineering Fellows Program
文檔2(Doc 2): Insight Data Science Fellows Program

如果我們想找包含詞項"insight"的文檔,我們可以掃描這個(單詞有序的)倒排索引,找到"insight"並返回包含改詞的文檔ID,示例中是Doc 1和Doc 2。
為了提高可檢索性(比如希望大小寫單詞都返回),我們應當先分析文檔再對其索引。分析包括2個部分:
- 將句子詞條化為獨立的單詞
- 將單詞規范化為標准形式
默認情況下,Elasticsearch使用標准分析器,它使用了:
- 標准分詞器以單詞為界來切詞
- 小寫詞條(token)過濾器來轉換單詞
還有很多可用的分析器在此不列舉,請參考相關文檔。使用TF-IDF法計算相似度。
為了實現查詢時能得到對應的結果,查詢時應使用與索引時一致的分析器,對文檔進行分析。
注意:標准分析器包含了停用詞過濾器,但默認情況下沒有啟用。
剖析寫操作
當我們發送索引一個新文檔的請求到協調節點后,將發生如下一組操作:
-
Elasticsearch集群中的每個節點都包含了改節點上分片的元數據信息。協調節點(默認)使用文檔ID參與計算,以便為路由提供合適的分片。Elasticsearch使用MurMurHash3函數對文檔ID進行哈希,其結果再對分片數量取模,得到的結果即是索引文檔的分片。
shard = hash(document_id) % (num_of_primary_shards)
- 當分片所在的節點接收到來自協調節點的請求后,會將該請求寫入translog(我們將在本系列接下來的文章中講到),並將文檔加入內存緩沖。如果請求在主分片上成功處理,該請求會並行發送到該分片的副本上。當translog被同步(fsync)到全部的主分片及其副本上后,客戶端才會收到確認通知。
- 內存緩沖會被周期性刷新(默認是1秒),內容將被寫到文件系統緩存的一個新段上。雖然這個段並沒有被同步(fsync),但它是開放的,內容可以被搜索到。
- 每30分鍾,或者當translog很大的時候,translog會被清空,文件系統緩存會被同步。這個過程在Elasticsearch中稱為沖洗(flush)。在沖洗過程中,內存中的緩沖將被清除,內容被寫入一個新段。段的fsync將創建一個新的提交點,並將內容刷新到磁盤。舊的translog將被刪除並開始一個新的translog。
下圖展示了寫請求及其數據流。

更新((U)pdate)和刪除((D)elete)
刪除和更新也都是寫操作。但是Elasticsearch中的文檔是不可變的,因此不能被刪除或者改動以展示其變更。那么,該如何刪除和更新文檔呢?
磁盤上的每個段(segment)都有一個相應的.del文件。當刪除請求發送后,文檔並沒有真的被刪除,而是在.del文件中被標記為刪除。該文檔依然能匹配查詢,但是會在結果中被過濾掉。當段合並(我們將在本系列接下來的文章中講到)時,在.del文件中被標記為刪除的文檔將不會被寫入新段。
接下來我們看更新是如何工作的。在新的文檔被創建時,Elasticsearch會為該文檔指定一個版本號。當執行更新時,舊版本的文檔在.del文件中被標記為刪除,新版本的文檔被索引到一個新段。舊版本的文檔依然能匹配查詢,但是會在結果中被過濾掉。
文檔被索引或者更新后,我們就可以執行查詢操作了。讓我們看看在Elasticsearch中是如何處理查詢請求的。
剖析讀操作((R)ead)
讀操作包含2部分內容:
- 查詢階段
- 提取階段
我們來看下每個階段是如何工作的。
查詢階段
在這個階段,協調節點會將查詢請求路由到索引的全部分片(主分片或者其副本)上。每個分片獨立執行查詢,並為查詢結果創建一個優先隊列,以相關性得分排序(我們將在本系列的后續文章中講到)。全部分片都將匹配文檔的ID及其相關性得分返回給協調節點。協調節點創建一個優先隊列並對結果進行全局排序。會有很多文檔匹配結果,但是,默認情況下,每個分片只發送前10個結果給協調節點,協調節點為全部分片上的這些結果創建優先隊列並返回前10個作為hit。
提取階段
當協調節點在生成的全局有序的文檔列表中,為全部結果排好序后,它將向包含原始文檔的分片發起請求。全部分片填充文檔信息並將其返回給協調節點。
下圖展示了讀請求及其數據流。

如上所述,查詢結果是按相關性排序的。接下來,讓我們看看相關性是如何定義的。
搜索相關性
相關性是由搜索結果中Elasticsearch打給每個文檔的得分決定的。默認使用的排序算法是tf/idf(詞頻/逆文檔頻率)。詞頻衡量了一個詞項在文檔中出現的次數 (頻率越高 == 相關性越高),逆文檔頻率衡量了詞項在全部索引中出現的頻率,是一個索引中文檔總數的百分比(頻率越高 == 相關性越低)。最后的得分是tf-idf得分與其他因子比如(短語查詢中的)詞項接近度、(模糊查詢中的)詞項相似度等的組合。
Elasticsearch three Cs(Consensus, Concurrency, Consistency)
Consensus
Consensus is one of the fundamental challenges of a distributed system. It requires all the processes/nodes in the system to agree on a given data value/status. There are a lot of consensus algorithms like Raft, Paxos, etc. which are mathematically proven to work, however, Elasticsearch has implemented its own consensus system (zen discovery) because of reasons described here by Shay Banon (Elasticsearch creator). The zen discovery module has two parts:
- Ping: The process nodes use to discover each other.
- Unicast: The module that contains a list of hostnames to control which nodes to ping.
Elasticsearch is a peer-to-peer system where all nodes communicate with each other and there is one active master which updates and controls the cluster wide state and operations. A new Elasticsearch cluster undergoes an election as part of the ping process where a node, out of all master eligible nodes, is elected as the master and other nodes join the master. The default ping_interval is 1 sec and ping_timeout is 3 sec. As nodes join, they send a join request to the master with a default join_timeout which is 20 times the ping_timeout. If the master fails, the nodes in the cluster start pinging again to start another election. This ping process also helps if a node accidentally thinks that the master has failed and discovers the master through other nodes.
For fault detection, the master node pings all the other nodes to check if they are alive and all the nodes ping the master back to report that they are alive.
If used with the default settings, Elasticsearch suffers from the problem of split-brain where in case of a network partition, a node can think that the master is dead and elect itself as a master resulting in a cluster with multiple masters. This may result in data loss and it may not be possible to merge the data correctly. This can be avoided by setting the following property to a quorum of master eligible nodes.
-
discovery.zen.minimum_master_nodes = int(# of master eligible nodes/2)+1

NOTE: For a production cluster, it is recommended to have 3 dedicated master nodes, which do not serve any client requests, out of which 1 is active at any given time.
As we have learned about consensus in Elasticsearch, let’s now see how it deals with concurrency.
Concurrency
Elasticsearch is a distributed system and supports concurrent requests. When a create/update/delete request hits the primary shard, it is sent in parallel to the replica shard(s) as well, however, it is possible that these requests arrive out of order. In such cases, Elasticsearch uses optimistic concurrency controlto make sure that the newer versions of the document are not overwritten by the older versions.
Every document indexed has a version number which is incremented with every change applied to that document. These version numbers are used to make sure that the changes are applied in order. To make sure that updates in our application don’t result in data loss, Elasticsearch’s API allows you to specify the current version number of a document to which the changes should be applied. If the version specified in the request is older than the one present in the shard, the request fails, which means that the document has been updated by another process. How failed requests are handled can be controlled at the application level. There are also other locking options available and you can read about them here.
As we send concurrent requests to Elasticsearch, the next concern is — how can we make these requests consistent? Now, it is unclear to answer which side of the CAP triangle Elasticsearch falls on and this has been a debate which we are not going to settle in this post.

Consistency — Ensuring consistent writes and reads
For writes, Elasticsearch supports consistency levels, different from most other databases, to allow a preliminary check to see how many shards are available for the write to be permissible. The available options are quorum, one and all. By default it is set to quorum and that means that a write operation will be permitted only if a majority of the shards are available. With a majority of the shards available, it may still happen that the writes to a replica fail for some reason and in that case, the replica is said to be faulty and the shard would be rebuilt on a different node.
For reads, new documents are not available for search until after the refresh interval. To make sure that the search request returns results from the latest version of the document, replication can be set to sync (default) which returns the write request after the operation has been completed on both primary and replica shards. In this case, search request from any shard will return results from the latest version of the document. Even if your application requires replication=async for higher indexing rate, there is a _preference parameter which can be set to primary for search requests. That way, the primary shard is queried for search requests and it ensures that the results will be from the latest version of the document.
As we understand how Elasticsearch deals with consensus, concurrency and consistency, let’s review some of the important concepts internal to a shard that result in certain characteristics of Elasticsearch as a distributed search engine.
Translog
The concept of a write ahead log (WAL) or a transaction log (translog) has been around in the database world since the development of relational databases. A translog ensures data integrity in the event of failure with the underlying principle that the intended changes must be logged and committed before the actual changes to the data are committed to the disk.
When new documents are indexed or old ones are updated, the Lucene index changes and these changes will be committed to the disk for persistence. It is a very expensive operation to be performed after every write request and hence, it is performed in a way to persist multiple changes to the disk at once. As we described in a previous blog, flush operation (Lucene commit) is performed by default every 30 min or when the translog gets too big (default is 512MB). In such a scenario, there is a possibility to lose all the changes between two Lucene commits. To avoid this issue, Elasticsearch uses a translog. All the index/delete/update operations are written to the translog and the translog is fsync’ed after every index/delete/update operation (or every 5 sec by default) to make sure the changes are persistent. The client receives acknowledgement for writes after the translog is fsync’ed on both primary and replica shards.
In case of a hardware failure between two Lucene commits or a restart, the translog is replayed to recover from any lost changes before the last Lucene commit and all the changes are applied to the index.
NOTE: It is recommended to explicitly flush the translog before restarting Elasticsearch instances, as the startup will be faster because the translog to be replayed will be empty. POST /_all/_flush command can be used to flush all indices in the cluster.
With the translog flush operation, the segments in the filesystem cache are committed to the disk to make changes in the index persistent. Let’s now see what Lucene segments are.
Lucene Segments
A Lucene index is made up of multiple segments and a segment is a fully functional inverted index in itself. Segments are immutable which allows Lucene to add new documents to the index incrementally without rebuilding the index from scratch. For every search request, all the segments in an index are searched, and each segment consumes CPU cycles, file handles and memory. This means that the higher the number of segments, the lower the search performance will be.
To get around this problem, Elasticsearch merges small segments together into a bigger segment (as shown in the figure below), commits the new merged segment to the disk and deletes the old smaller segments.

This automatically happens in the background without interrupting indexing or searching. Since segment merging can use up resources and affect search performance, Elasticsearch throttles the merging process to have enough resources available for search.
Anatomy of an Elasticsearch Cluster: Part II
Near real-time search
While changes in Elasticsearch are not visible right away, it does offer a near real-time search engine. As mentioned in a previous post, committing Lucene changes to disk is an expensive operation. To avoid committing changes to disk while still make documents available for search, there is a filesystem cache sitting in between the memory buffer and the disk. The memory buffer is refreshed every second (by default) and a new segment, with the inverted index, is created in the filesystem cache. This segment is opened and made available for search.
A filesystem cache can have file handles and files can be opened, read and closed, however, it lives in memory. Since, the refresh interval is 1 sec by default, the changes are not visible right away and hence it is near real-time. Since, the translog is a persistent record of changes not persisted to the disk, it also helps with the near real-time aspect for CRUD operations. For every request, the translog is searched for any recent changes before looking into relevant segments and hence, the client has access to all the changes in near real-time.
You can explicitly refresh the index after every Create/Update/Delete operation to make the changes visible right away but it is not recommended as it affects the search performance due to too many small segments being created. For a search request, all Lucene segments in a given shard of an Elasticsearch index are searched, however, fetching all matching documents or documents deep in the resulting pages is dangerous for your Elasticsearch cluster. Let’s see why that is.
Why deep pagination in distributed search can be dangerous?
When you make a search request in Elasticsearch which has a lot of matching documents, by default, the first page returned consists of the top 10 results. The search API has from and size parameters to specify how deep the results should be for all the documents that match the search. For instance, if you want to see the documents with ranks 50 to 60 that match the search, then from=50 and size=10. When each shard receives the search request, it creates a priority queue of size from+size to satisfy the search result itself and then returns the results to the coordinating node.

If you want to see results ranked from 50,000 to 50,010, then each shard will create a priority queue with 50,010 results each, and the coordinating node will have to sort number of shards * 50,010 results in memory. This level of pagination may or may not be possible depending upon the hardware resources you have but it suffices to say that you should be very careful with deep pagination as it can easily crash your cluster.
It is possible to get all the documents that match the result using the scroll APIwhich acts more like a cursor in relational databases. Sorting is disabled with the scroll API and each shard keeps sending results as long as it has documents that match the search.
Sorting scored results is expensive if a large number of documents are to be fetched. And since Elasticsearch is a distributed system, calculating the search relevance score for a document is very expensive. Let’s now see one of the many trade-offs made to calculate search relevance.
Trade-offs in calculating search relevance
Elasticsearch uses tf-idf for search relevance and because of its distributed nature, calculating a global idf (inverse document frequency) is very expensive. Instead, every shard calculates a local idf to assign a relevance score to the resulting documents and returns the result for only the documents on that shard. Similarly, all the shards return the resulting documents with relevant scores calculated using local idf and the coordinating node sorts all the results to return the top ones. This is fine in most cases, unless your index is skewed in terms of keywords or there is not enough data on a single shard to represent the global distribution.
For instance, if you are searching for the word “insight” and a majority of the documents containing the term “insight” reside on one shard, then the documents that match the query won’t be fairly ranked on each shard as the local idf values will vary greatly and the search results might not be very relevant. Similarly, if there is not enough data, then the local idf values may vary greatly for some searches and the results might not be as relevant as expected. In real-world scenarios with enough data, the local idf values tend to even out and search results are relevant as the documents are fairly scored.
There are a couple of ways to get around the local idf score but they are not really recommended for production systems.
- One way is that you can have just one shard for the index and then the local idf is the global idf, but this doesn’t leave room for parallelism/scaling and isn’t practical for huge indices.
- Another way is to use a parameter dfs_query_then_search (dfs = distributed frequency search) with the search request, which calculates local idf for all shards first, then combines these local idf values to calculate a global idf for the entire index and then returns the results with the relevance score calculated using the global idf. This isn’t recommended in production and having enough data would ensure that the term frequencies are well distributed.
In the last few posts, we reviewed some of the fundamental principles of Elasticsearch which are important to understand in order to get started. In a follow up post, I will be going through indexing data in Elasticsearch using Apache Spark.
Anatomy of an Elasticsearch Cluster: Part III
