Elasticsearch大規模時序索引如何治理和規划



 

什么是時序索引?

其主要特點體現在兩個方面,

一存,以時間為軸,數據只有增加,沒有變更,並且必須包含timestamp(日期時間,名稱隨意)字段,其作用和意義要大於數據的id字段,常見的數據比如我們通常要記錄的操作日志、用戶行為日志、或股市行情數據、服務器CPU、內存、網絡的使用率等;

二取,一定是以時間范圍為第一過濾條件,然后是其它查詢條件,比如近一天、一周、本月等等,然后在這個范圍內進行二次過濾,比如性別或地域等,查詢結果中比較關注的是每條數據和timestamp字段具體發生的時間點,而非id。

此類數據一般用於OLAP、監控分析等場景。

最近的一個項目是風控過程數據實時統計分析和聚合的一個OLAP分析監控平台,日流量峰值在10到12億上下,每年數據約4000億條,占用空間大概200T,面對這樣一個數據量級的需求,我們的數據如何存儲和實現實時查詢將是一個嚴峻的挑戰,經過對Elasticsearch多方調研和超過幾百億條數據的插入和聚合查詢的驗證之后,總結出以下幾種能夠有效提升性能和解決這一問題的方案,包括從集群規划、存儲策略、索引拆分、壓縮、冷熱分區等幾個維度的優化方案,在本文中逐一介紹,希望對你有所幫助和啟發。

本文所使用的Elasticsearch版本為5.3.3。

一,集群部署規划

 

 

我們都知道在Elasticsearch(下稱ES)集群中有兩個主要角色,Master Node和Data Node,其它如Tribe Node等節點可根據業務需要另行設立。為了讓集群有更好的性能表現,我們應該對這兩個角色有一個更好的規划,在Nodes之間做讀取分離,保證集群的穩定性和快速響應,在大規模的數據存儲和查詢的壓力之下能夠坦然面對,各自愉快的協作:)

Master Nodes

Master Node,整個集群的管理者,負有對index的管理、shards的分配,以及整個集群拓撲信息的管理等功能,眾所周知,Master Node可以通過Data Node兼任,但是,如果對群集規模和穩定要求很高的話,就要職責分離,Master Node推薦獨立,它狀態關乎整個集群的存活,Master的配置

node.master: true node.data: false node.ingest: false 

這樣Master不參與I、O,從數據的搜索和索引操作中解脫出來,專門負責集群的管理工作,因此Master Node的節點配置可以相對低一些。

另外防止ES集群 split brain(腦裂),合理配置discovery.zen.minimum_master_nodes參數,官方推薦master-eligible nodes / 2 + 1向下取整的個數,這個參數決定選舉Master的Node個數,太小容易發生“腦裂”,可能會出現多個Master,太大Master將無法選舉。

更多Master選舉相關內容請參考:modules-discovery-zen#master-election

Data Nodes

Data Node是數據的承載者,對索引的數據存儲、查詢、聚合等操作提供支持,這些操作嚴重消耗系統的CPU、內存、IO等資源,因此,應該把最好的資源分配給Data Node,因為它們是真正干累活的角色,同樣Data Node也不兼任Master的功能,配置:

node.master: false node.data: true node.ingest: false 

Coordinating Only Nodes

 

 

ES本身是一個分布式的計算集群,每個Node都可以響應用戶的請求,包括Master Node、Data Node,它們都有完整的cluster state信息,正如我們知道的一樣,在某個Node收到用戶請求的時候,會將請求轉發到集群中所有索引相關的Node上,之后將每個Node的計算結果合並后返回給請求方,我們暫且將這個Node稱為查詢節點,整個過程跟分布式數據庫原理類似。那問題來了,這個查詢節點如果在並發和數據量比較大的情況下,由於數據的聚合可能會讓內存和網絡出現瓶頸,因此,在職責分離指導思想的前提下,這些操作我們也應該從這些角色中剝離出來,官方稱它是 coordinating nodes,只負責路由用戶的請求,包括讀、寫等操作,對內存、網絡和CPU要求比較高,本質上,coordinating only nodes可以籠統的理解為是一個負載均衡器,或者反向代理,只負責讀,本身不寫數據,它的配置是:

node.master: false node.data: false node.ingest: false search.remote.connect: false 

增加coordinating nodes的數量可以提高API請求響應的性能,我們也可以針對不同量級的index分配獨立的coordinating nodes來滿足請求性能,那是不是越多越好呢,在一定范圍內是肯定的,但凡事有個度,過了負作用就會突顯,太多的話會給集群增加負擔,在做Master選舉的時候會先確保所有Node的cluster state是一致的,同步的時候會等待每個Node的acknowledgement確認,所以適量分配可以讓集群暢快的工作。

search.remote.connect是禁用跨集群查詢,防止在進行集群之間查詢時發生二次路由,modules-cross-cluster-search

二,Routing

類似於分布式數據庫中的分片原則,將符合規則的數據存儲到同一分片。ES通過哈希算法來決定數據存儲於哪個shard,

shard_num = hash(_routing) % num_primary_shards

其中hash(_routing)得出一個數字,然后除以主shards的數量得到一個余數,余數的范圍是0到number_of_primary_shards - 1,這個數字就是文檔所在的shard。

Routing默認是id值,當然可以自定義,合理指定Routing能夠大幅提升查詢效率,Routing支持GET、Delete、Update、Post、Put等操作。

如:

PUT my_index/my_type/1?routing=user1
{
  "title": "This is a document" } GET my_index/my_type/1?routing=user1 

不指定Routing的查詢過程:

 

簡單的來說,一個查詢請求過來以后會查詢每個shard,然后做結果聚合,總的時間大概就是所有shard查詢所消耗的時間之和。

指定Routing以后:

 

 

會根據Routing查詢特定的一個或多個shard,這樣就大大減少了查詢時間,提高了查詢效率,當然,如何設置Routing是一個難點,需要一點技巧,要根據業務特點合理組合Routing的值,來划分shard的存儲,最終保持數據量相對均衡。

可以組合幾個維度做為Routing ,有點類似於hbase key,例如不同的業務線加不同的類別,不同的城市和不同的類型等等,如

  1. _search?routing=beijing按城市
  2. _search?routing=beijing_user123按城市和用戶
  3. _search?routing=beijing_android,shanghai_android按城市和手機類型等。

數據不均衡?

假如你的業務在北京、上海的數據遠遠大於其它二三線城市的數據,再例如我們的業務場景,A業務線的數據量級遠遠大於B業務線,有時候很難通過Routing指定個一個值保證數據在所有shards上均勻分布,會讓部分shard變的越來越大,影響查詢性能,怎么辦?

一種解決辦法是單獨為這些數據量大的渠道創建獨立的index,如http://localhost:9200/shanghai,beijing,other/_search?routing=android,這樣可以根據需要在不同index之間查詢,然而每個index中shards的數據可以做到相對均衡。

另一種辦法是指定index參數index.routing_partition_size,來解決最終可能產生群集不均衡的問題,指定這個參數后新的算法如下:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

index.routing_partition_size 應具有大於1且小於 index.number_of_shards 的值,最終數據會在routing_partition_size幾個shard上均勻存儲,是哪個shard取決於hash(_id) % routing_partition_size的計算結果。

指定參數index.routing_partition_size后,索引中的mappings必須指定_routing為"required": true,另外mappings不支持parent-child父子關系。

很多情況下,指定Routing后會大幅提升查詢性能,畢竟查詢的shard只有那么幾個,但是如何設置Routing是個難題,可根據業務特性巧妙組合。

三,索引拆分

Index通過橫向擴展shards實現分布式存儲,這樣可以解決index大數據存儲的問題,但在一個index變的越來越大,單個shard也越來越大,查詢和存儲的速度也越來越慢,更重要的是一個index其實是有存儲上限的(除非你設置足夠多的shards和機器),如官方聲明單個shard的文檔數不能超過20億(受限於Lucene index,每個shard是一個Lucene index),考慮到I、O,針對index每個node的shards數最好不超過3個,那面對這樣一個龐大的index,我們是采用更多的shards,還是更多的index我們如何選擇,index的shards總量也不宜太多,更多的shards會帶來更多的I、O開銷,其實答案就已經很明確,除非你能接受長時間的查詢等待。

Index拆分的思路很簡單,時序索引有一個好處就是只有增加,沒有變更,按時間累積,天然對索引的拆分友好支持,可以按照時間和數據量做任意時間段的拆分,ES提供的Rollover Api + Index Template可以非常便捷和友好的實現index的拆分工作,把單個index docs數量控制在百億內,也就是一個index默認5個shards左右即可,保證查詢的即時響應。

簡單介紹一下Rollover Api 和 Index Template這兩個東西,如何實現index的拆分

Index template

我們知道ES可以為同一目的或同一類索引創建一個Index template,之后創建的索引只要符合匹配規則就會套用這個template,不必每次指定settings和mappings等屬性。

一個index可以被多個template匹配,那settings和mappings就是多個template合並后的結果,有沖突通過template的屬性"order" : 0從低到高覆蓋(這部分據說會在ES6中會做調整,更好的解決template匹配沖突問題)。

示例:

PUT _template/template_1
{
    "index_patterns" : ["log-*"], "order" : 0, "settings" : { "number_of_shards" : 5 }, "aliases" : { "alias1" : {} } } 

Rollover Index

 

 

Rollover Index可以將現有的索引通過一定的規則,如數據量和時間,索引的命名必須是logs-000001這種格式,並指定aliases,示例:

PUT /logs-000001 
{
  "aliases": { "logs_write": {} } } # Add > 1000 documents to logs-000001 POST /logs_write/_rollover { "conditions": { "max_age": "7d", "max_docs": 1000 } } 

先創建索引並指定別名logs_write,插入1000條數據,然后請求_rollover api並指定拆分規則,如果索引中的數據大於規則中指定的數據量或者時間過時,新的索引將被創建,索引名稱為logs-000002,並根據規則套用index template,同時別名logs_write也將被變更到logs-000002。

注意事項:

  1. 索引命名規則必須如同:logs-000001
  2. 索引必須指定aliases
  3. Rollover Index Api調用時才去檢查索引是否超出指定規則,不會自動觸發,需要手動調用,可以通過Curator實現自動化
  4. 如果符合條件會創建新的索引,老索引的數據不會發生變化,如果你已經插入2000條,拆分后還是2000條
  5. 插入數據時一定要用別名,否則你可能一直在往一個索引里追加數據

技巧:

按日期滾動索引

PUT /<logs-{now/d}-1>
{
  "aliases": { "logs_write": {} } } 

假如生成的索引名為logs-2017.04.13-1,如果你在當天執行Rollover會生成logs-2017.04.13-000001,次日的話是logs-2017.04.14-000001。

這樣就會按日期進行切割索引,那如果你想查詢3天內的數據可以通過日期規則來匹配索引名,如:

GET /<logs-{now/d}-*>,<logs-{now/d-1d}-*>,<logs-{now/d-2d}-*>/_search

到此,我們就可以通過Index Template和Rollover Api實現對index的任意拆分,並按照需要進行任意時間段的合並查詢,這樣只要你的時間跨度不是很大,查詢速度一般可以控制在毫秒級,存儲性能也不會遇到瓶頸。

四,Hot-Warm架構

 

 

冷熱架構,為了保證大規模時序索引實時數據分析的時效性,可以根據資源配置不同將Data Nodes進行分類形成分層或分組架構,一部分支持新數據的讀寫,另一部分僅支持歷史數據的存儲,存放一些查詢發生機率較低的數據,即Hot-Warm架構,對CPU,磁盤、內存等硬件資源合理的規划和利用,達到性能和效率的最大化。

我們可以通過ES的Shard Allocation Filtering來實現Hot-Warm的架構。

實現思路:

  1. 將Data Node根據不同的資源配比打上標簽,如:host、warm
  2. 定義2個時序索引的index template,包括hot template和warm template,hot template可以多分配一些shard和擁有更好資源的Hot Node
  3. 用hot template創建一個active index名為active-logs-1,別名active-logs,支持索引切割,插入一定數據后,通過roller over api將active-logs切割,並將切割前的index移動到warm nodes上,如active-logs-1,並阻止寫入
  4. 通過Shrinking API收縮索引active-logs-1為inactive-logs-1,原shard為5,適當收縮到2或3,可以在warm template中指定,減少檢索的shard,使查詢更快
  5. 通過force-merging api合並inactive-logs-1索引每個shard的segment,節省存儲空間
  6. 刪除active-logs-1

Hot,Warm Nodes

Hot Nodes

擁有最好資源的Data Nodes,如更高性能的CPU、SSD磁盤、內存等資源,這些特殊的Nodes支持索引所有的讀、寫操作,如果你計划以100億為單位來切割index,那至少需要三個這樣的Data Nodes,index的shard數為5,每個shard支持20億documents數據的存儲

為這類Data Nodes打上標簽,以便我們在template中識別和指定

啟動參數:

./bin/elasticsearch -Enode.attr.box_type=hot

或者配置文件:

node.attr.box_type: hot

Warm Nodes

存儲只讀數據,並且查詢量較少,但用於存儲長多時間歷史數據的Data Nodes,這類Nodes相對Hot Nodes較差的硬件配置,根據需求配置稍差的CPU、機械磁盤和其它硬件資源,至於數量根據需要保留多長時間的數據來配比,同樣需要打上標簽,方法跟hot nodes一樣,指定為warm,box_type: warm。

Hot,Warm Template

Hot Template

我們可以通過指定參數"routing.allocation.include.box_type": "hot",讓所有符合命名規則索引的shard都將被分配到hot nodes上。

PUT _template/active-logs
{
  "template": "active-logs-*", "settings": { "number_of_shards": 5, "number_of_replicas": 1, "routing.allocation.include.box_type": "hot", "routing.allocation.total_shards_per_node": 2 }, "aliases": { "active-logs": {} } } 

Warm Template

同樣符合命名規則索引的shard會被分配到warm nodes上,我們指定了更少的shards數量和復本數,注意,這里的復本數為0,和best_compression級別的壓縮,方便做遷移等操作,以及進行一些數據的壓縮。

PUT _template/inactive-logs
{
  "template": "inactive-logs-*", "settings": { "number_of_shards": 1, "number_of_replicas": 0, "routing.allocation.include.box_type": "warm", "codec": "best_compression" } } 

假如我們已經創建了索引active-logs-1 ,當然你可以通過_bulk API快速寫入測試的數據,然后參考上文中介紹的rollover api進行切割。

Shrink Index

Rollover api切割以后,active-logs-1將變成一個冷索引,我們將它移動到warm nodes上,先將索引置為只讀狀態,拒絕任何寫入操作,然后修改index.routing.allocation.include.box_type屬性,ES會自動移動所有shards到目標Data Nodes上。

PUT active-logs-1/_settings
{
  "index.blocks.write": true, "index.routing.allocation.include.box_type": "warm" } 

cluster health API可以查看遷移狀態,完成后進行收縮操作,其實相當於復制出來一個新的索引,舊的索引還存在。

POST active-logs-1/_shrink/inactive-logs-1

我們可以通過Head插件查看整個集群索引的變化情況。

關於shard的分配請參考Shard Allocation Filtering

Forcemerge

到目前為止我們已經實現了索引的冷熱分離,和索引的收縮,我們知道每個shard下面由多個segment組成,那inactive-logs-1的shard數是1,但segment還是多個。

這類索引不會在接受寫入操作,為了節約空間和改善查詢性能,通過Forcemerge Api將segment適量合並,

PUT inactive-logs-1/_settings
{ "number_of_replicas": 1 } 

ES的Forcemerge過程是先創建新的segment刪除舊的,所以舊segment的壓縮方式best_compression不會在新的segment中生效,新的segment還是默認的壓縮方式。

現在inactive-logs-1的復本是還是0,如果有需要的話,可以分配新的復本數

PUT inactive-logs-1/_settings
{ "number_of_replicas": 1 } 

最后刪除active-logs-1,因為我們已經為它做了一個查詢復本inactive-logs-1,

DELETE active-logs-1

走到這里,我們就已經實現了index的Hot-Warm架構,根據業務特點將一段時間的數據放在Hot Nodes,更多的歷史數據存儲於Warm Nodes。

五,其它優化方案

索引隔離

 

 

在多條業務線的索引共用一個ES集群時會發生流量被獨吃獨占的情況,因為大家都共用相同的集群資源,流量大的索引會占用大部分計算資源而流量小的也會被拖慢,得不到即時響應,或者說業務流量大的索引可以按天拆分,幾個流量小的索引可以按周或月拆分。

這種情況下我們可以將規模大的索引和其它相對小規模的索引獨立存儲,分開查詢或合並查詢,除了Master Nodes以外,Data Nodes和Coordinating Nodes都可以獨立使用(其實如果每個索引的量都特別大也應該采用這種架構),還有一個好處是對方的某個Node掛掉,自己不受影響。

同樣利用ES支持Shard Allocation Filtering功能來實現索引的資源獨立分配,先將nodes進行打標簽,划分區域,類似於Hot-Warm架構node.attr.zone=zone_a、node.attr.zone=zone_b

或者

node.attr.zone =zone_hot_a、node.attr.zone=zone_hot_b

等打標簽的方式來區別對應不同的索引,然后在各自的index template中指定不同的node.attr.zone即可,如"index.routing.allocation.include.zone" : "zone_a,zone_hot_a",

或者排除法"index.routing.allocation.exclude.size": "zone_hot_b"

分配到zone_hot_b以外的所有Data Nodes上。

更多用法可以參考Hot-Warm架構,或shard-allocation-filtering

跨數據中心

 

 

如果你的業務遍布全國各地,四海八荒,如果你數據要存儲到多個機房,如果你的index有幾萬個甚至更多( index特別多,集群龐大會導致cluster state信息量特變大,因為cluster state包含了所有shard、index、node等所有相關信息,它存儲在每個node上,這些數據發生變化都會實時同步到所有node上,當這個數據很大的時候會對集群的性能造成影響),這些情況下我們會考慮部署多個es cluster,那我們將如何解決跨集群查詢的問題呢?目前es針對跨集群操作提供了兩種方案Tribe node和Cross Cluster Search。

Tribe node

 

 

需要一個獨立的node節點,加入其它es cluster,用法有點類似於Coordinating Only Node,所不同的是tribe是針對多個es集群之間的所有節點,tribe node收到請求廣播到相關集群中所有節點,將結果合並處理后返回,表面上看起來tribe node將多個集群串連成一個了一個整體,遇到同名index發生沖突,會選擇其中一個index,也可以指定。

tribe: on_conflict: prefer_t1 t1: cluster.name: us-cluster discovery.zen.ping.multicast.enabled: false discovery.zen.ping.unicast.hosts: ['usm1','usm2','usm3'] t2: cluster.name: eu-cluster discovery.zen.ping.multicast.enabled: false discovery.zen.ping.unicast.hosts: ['eum1','eum2','eum3'] 

Cross Cluster Search

 

 

Cross Cluster Search可以讓集群中的任意一個節點聯合查詢其它集群中的數據, 通過配置elasticsearch.yml或者API來啟用這個功能,API示例

PUT _cluster/settings
{
  "persistent": { "search": { "remote": { "cluster_one": { "seeds": [ "127.0.0.1:9300" ] ... } } } } } 

提交以后整個集群所有節點都會生效,都可以做為代理去做跨集群聯合查詢,不過我們最好還是通過Coordinating Only Nodes去發起請求。

POST /cluster_one:decision,decision/_search
{
    "match_all": {} } 

對集群cluster_one和本集群中名為decision的索引聯合查詢。

目前這個功能還在測試階段,但未來可能會取代tribe node,之間的最大的差異是tribe node需要設置獨立的節點,而Cross Cluster Search不需要,集群中的任意一個節點都可以兼任,比如可以用我們的Coordinating Only Nodes做為聯合查詢節點,另一個優點是配置是動態的,不需要重啟節點,實際上可以理解為是一個ES集群之間特定的動態代理工具,支持所有操作,包括index的創建和修改,並且通過namespace對index進行隔離,也解決了tribe node之index名稱沖突的問題。

小結

我們在文中介紹了幾種方案用來解決時序索引的海量數據存儲和查詢的問題,根據業務特點和使用場景來單獨或組合使用能夠發揮出意想不到的效果,特別是nodes之間的讀寫分離、索引拆分、Hot-Warm等方案的組合應用對索引的查詢和存儲性能有顯著的提升,另外routing在新版本中增加了routing_partition_size解決了shard難以均衡的問題,如果你的索引mapping中沒有parent-child關聯關系可以考慮使用,對查詢的性能提升非常有效。

本文參考內容:

  1. hot-warm-architecture-in-elasticsearch-5-x
  2. managing-time-based-indices-efficiently
  3. what-is-evolving-in-elasticsearch
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM