elasticsearch reIndex


elasticsearch 基礎 —— ReIndex
Reindex會將一個索引的數據復制到另一個已存在的索引,但是並不會復制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。

一、reindex的常用操作
1、reindex基礎實現
    _reindex會將一個索引的快照數據copy到另一個索引,默認情況下存在相同的_id會進行覆蓋(一般不會發生,除非是將兩個索引的數據copy到一個索引中),可以使用以下命令將索引快照進行copy:

POST _reindex
{
  "source": {
    "index": "my_index_name"
  },
  "dest": {
    "index": "my_index_name_new"
  }
} 
2、version_type(沖突的解決)
    version_type屬性默認值為internal,即當發生沖突后會覆蓋之前的document,而當設置為external則會新生成一個另外的document,設置方式如下:

POST _reindex
{
  "source": {
    "index": "my_index_name"
  },
  "dest": {
    "index": "my_index_name_new",
    "version_type": "external"
  }
}
3、op_type和conflicts
  將op_type設置為create時,只會對發生不同的document進行reindex,(若定時機制的reindex則可以使用該方式只對最新的不存在的document進行reindex)。並且可以將conflicts屬性設置為proceed,將沖突進行類似於continue的操作,設置方式如下:

POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "my_index_name"
  },
  "dest": {
    "index": "my_index_name_new",
    "op_type": "create"
  }
}
4、query的reindex
  對滿足query條件的數據進行reindex操作,查詢方式如下:

POST _reindex
{
  "source": {
    "index": "my_index_name",
    "type": "my_type_name",
    "query": {  // query的條件
      "term": {
        "user": "user_value"
      }
    }
  },
  "dest": {
    "index": "my_index_name_new"
  }
} 
5、多Index、Type數據的reindex
  可以將多個索引或類型的數據reindex到一個新的索引中,當然還可以使用query查詢條件只對其中滿足條件的部分數據進行reindx,若不設置沖突則還是默認會進行覆蓋,只是不能保證相同ID的數據那個索引的數據會被先索引而被覆蓋,設置方式如下:

POST _reindex
{
    "source": {
        "index": [
            "index_name_1",
            "index_name_1"
        ],
        "type": [
            "type_name_1",
            "type_name_2"
        ],
        "query": {//query的條件
            "term": {
                "user": "kimchy"
            }
        }
    },
    "dest": {
        "index": "all_together_index_name"
    }
}
6、size、sort(reindex的條數和排序控制)
POST _reindex
{
  "size": 10000,   // 值reindex按照sort排序后的size條數據
  "source": {
    "index": "my_index_name",
    "sort": { "date": "desc" } 
  },
  "dest": {
    "index": "my_index_name_new"
  }
}
7、source條件的reindex
  滿足_source中包含數組field(字段)的數據才會被reindex,設置方式如下:

POST _reindex
{
  "source": {
    "index": "my_index_name",
    "_source": ["field_name_1", "field_name_2"]
  },
  "dest": {
    "index": "my_index_name_new"
  }
}
8、script類型的reindex
  與_update_by_query相同的是reindex也可以使用script,但是不同的是reindex可以修改源索引的數據信息,比如:

POST _reindex
{
  "source": {
    "index": "my_index_name"
  },
  "dest": {
    "index": "my_index_name_new",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
}
  修改字段名稱

  以下會在copy后將新索引中的flag字段名稱修改為tag:

POST _reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test2"
  },
  "script": {
    "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
  }
}
  ctx.op 

    ctx.op只能等於noop或delete,等於其他值將報錯,並且設置ctx的其他字段也會報錯。設置為noop后不會有任何操作發生,設置為delete后會從目標索引中刪除滿足條件的數據,並且都會在response body中返回總條數。

謹慎操作字段

  _id

  _type

  _index

  _version

  _routing

  _parent

  這些字段都可以在reindex的操作中自行定義,但是需要謹慎操作。

  _version 字段可以設置為null或者在ctx的map中清除該字段,則reindex時效果都是沒有copy其值,會引起數據的覆蓋。

  而routing值可以設置為以下值:

        keep 默認值,會copy對應的路由值到新的index中。

        discard 將值設置為null

        =<some text> 將值設置為指定值,設置方式如下:

POST _reindex
{
    "source": {
        "index": "source",
        "query": {//設置查詢條件
            "match": {
                "company": "cat"
            },
            "size": 100//滿足條件的100條
        }
    },
    "dest": {
        "index": "dest",
        "routing": "=cat"//reindex到新的索引中使用該路由值
    }
}
二、遠程reindex
  可以將遠程(其他集群)的數據reindex到當前的集群環境中,但是需要設置當前集群的elsticsearch.yml配置中設置遠程白名單列表,配置reindex.remote.whitelist屬性,如otherhost:9200, another:9200, 127.0.10.*:9200, localhost:* 。只要環境可訪問,則可以在任何版本之間對數據進行reindex,那么這也是版本es升級的數據遷移不錯的選擇。為了使發送到舊版本的彈性搜索的查詢,查詢參數被直接發送到遠程主機,而不需要進行驗證或修改。

    但是manual 和 automatic slicing.不能使用遠程reindex,設置方式如下:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200", // 遠程es的ip和port列表
      "socket_timeout": "1m",
      "connect_timeout": "10s"  // 超時時間設置
    },
    "index": "my_index_name", // 源索引名稱
    "query": {         // 滿足條件的數據
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest_index_name"  // 目標索引名稱
  }
}
三、URL Parameters(reindex參數設置)
    Url可選參數有pretty,refresh, wait_for_completion, wait_for_active_shards, timeout, requests_per_second.

  1、refresh
        Index API的refresh只會讓接收新數據的碎片被刷新,而reindex的refresh則會刷新所有索引。

  2、wait_for_completion
   將參數設置為false則會執行一些預執行檢查,啟動請求,然后返回一個任務,該任務可以用於任務api來取消或獲得任務的狀態。Es會在.tasks/task/${taskId}中創建記錄ID。

  3、wait_for_active_shards
        在Bulk API的情況下,requests_per_second可以設置在繼續索引之前,控制多少個碎片的拷貝數必須是活躍的。而timeout 超時控制每個寫請求等待不可用的碎片等待的時間。

  4、requests_per_second
   每秒的請求數據,顯然是節流控制參數,運行設置一個正整數,設置為-1表示不進行控制。

 

四、返回參數說明
{
  "took" : 639,    // 執行全過程使用的毫秒數
  "updated": 0,    // 成功修改的條數
  "created": 123,  // 成功創建的條數
  "batches": 1,    // 批處理的個數
  "version_conflicts": 2, // 版本沖突個數
  "retries": {     // 重試機制
    "bulk": 0,     // 重試的批個數
    "search": 0    // 重試的查詢個數
  }
  "throttled_millis": 0, // 由於設置requests_per_second參數而sleep的毫秒數
  "failures" : [ ]  // 失敗的數據
}
五 、Task API 操作
  1、使用Task API查看reindex的情況
GET _tasks?detailed=true&actions=*reindex
狀態如下:可知道當前的taskId = 9620804

2、使用TaskId查看執行的狀態 
{
  "nodes": {
    "_b5PSdInTVWaji9TUrWANg": {
      "name": "node-2",
      "transport_address": "192.168.10.15:9300",
      "host": "192.168.10.15",
      "ip": "192.168.10.15:9300",
      "roles": [
        "master",
        "data",
        "ingest"
      ],
      "attributes": {
        "ml.max_open_jobs": "10",
        "ml.enabled": "true"
      },
      "tasks": {
        "_b5PSdInTVWaji9TUrWANg:9620804": {
          "node": "_b5PSdInTVWaji9TUrWANg",
          "id": 9620804,
          "type": "transport",
          "action": "indices:data/write/reindex",
          "status": {
            "total": 216361,
            "updated": 0,
            "created": 30000,
            "deleted": 0,
            "batches": 31,
            "version_conflicts": 0,
            "noops": 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0,
            "requests_per_second": -1,
            "throttled_until_millis": 0
          },
          "description": "reindex from [geleevr] to [geleevr_new]",
          "start_time_in_millis": 1511316869170,
          "running_time_in_nanos": 12077416434,
          "cancellable": true
        }
      }
    }
  }
}
GET /_tasks/taskId:9620804
可以查看total,updated,created,deleted等狀態

3、使用Cancel Task API取消正在執行的reindex操作
   取消操作可能需要幾秒鍾的時間,取消方式如下:

POST _tasks/task_id:9620804/_cancel
4、使用Task API 重置reindex的節流限制
POST _reindex/task_id:9620804/_rethrottle?requests_per_second=-1
 六、並行化執行reindex操作
1、手動並行化
如下是兩個slices的手動並行化reindex:

POST _reindex
{
  "source": {
    "index": "my_index_name",
    "slice": {   // 第一slice執行操作
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "my_index_name_new"
  }
}
POST _reindex
{
  "source": {
    "index": "my_index_name",
    "slice": {   // 第二slice執行操作
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "my_index_name_new"
  }
}
可以通過以下命令查看執行的結果:

GET _refresh
POST my_index_name/_search?size=0&filter_path=hits.total
結果如下:

{
  "hits": {
    "total": 120
  }
}
2、自動並行化
如下是自動划分的5個slices,只是將需要手動划分的過程自動化處理,將一個操作拆分為多個子操作並行化處理,其他查詢方式等都一樣,如下:

POST _reindex?slices=5&refresh
{
  "source": {
    "index": "my_index_name"
  },
  "dest": {
    "index": "my_index_name_new"
  }
} 
3、並行化處理的特性
        同樣可以使用Task API查看每個slices的子請求(child)的task狀態;

        獲取每個slices請求的任務狀態,只返回已完成的狀態;

        這些子請求單獨可尋址,比如取消操作和重新配置節流操作;

        對每個slices進行重新配置節流時,會將所有未完成的操作進行比例分配;

        對每個slices進行取消操作其他所有slices都會生效;

        每個請求只擁有全部數據的部分,並且每個文檔的大小會不同,大文件基本分配均勻;

        並行化處理是使用requests_per_second 或size等,可能或導致分布不均勻;

        每個子請求可能獲取到不同版本或快照的源索引數據。

4、slices數量設置要求
        數量不能過大,比如500可能出現CPU問題;

        查詢性能角度看,設置slices為源索引的分片的倍數是比較合適的,一倍是最有效的;

        索引性能角度看,應該隨着可用資源的數量線性地擴展;

        然而索引或查詢性能是否在此過程中占據主導,取決於許多因素,比如重新索引的文檔和重新索引的集群。

 

七、使用索引名稱,reindex每天的數據
    如存在如下數據:

PUT metricbeat-2016.05.30/beat/1?refresh
{"system.cpu.idle.pct": 0.908}
 
PUT metricbeat-2016.05.31/beat/1?refresh
{"system.cpu.idle.pct": 0.105}
  可以執行如下reindex腳本:

POST _reindex
{
  "source": {
    "index": "metricbeat-*"
  },
  "dest": {
    "index": "metricbeat"
  },
  "script": {
    "lang": "painless",
    "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
  }
}
    再使用以下命令進行查看:

GET metricbeat-2016.05.30-1/_doc/1
GET metricbeat-2016.05.31-1/_doc/1
八、隨機對源索引的一個子集合進行reindex
  下面是源索引的一個子集合進行索引的例子,說明:默認會按照_doc進行排序,而score不會起到任何的作用,除非如下提別對score排序進行指定,如下:

POST _reindex
{
  "size": 10,
  "source": {
    "index": "my_index_name",
    "query": {
      "function_score" : {
        "query" : { "match_all": {} },
        "random_score" : {}
      }
    },
    "sort": "_score"    
  },
  "dest": {
    "index": "random_my_index_name"
  }
}
九、reindex在項目中的使用
     以上是對reindex的基本概念和特性的學習,自己在項目中的使用場景:第一是在集群的es版本升級的情況下

1、es集群版本升級的數據遷移 或 將現有生產數據copy的dev等集群環境
        該部分可以在kibana的dev Tools下面直接使用remote reindex的腳本即可,但是需要設置當前集群的elsticsearch.yml配置中設置遠程白名單列表,配reindex.remote.whitelist屬性,如otherhost:9200, another:9200, 127.0.10.*:9200, localhost:* 。但是千萬注意remote reindex不能使用並行化處理,即不能使用slices參數,這一點官方文檔上沒有明確指出,但是在使用的時候會報錯,去掉即可。

POST _reindex?refresh
{
  "source": {
    "remote": {
      "host": "http://192.168.10.20:9200", 
      "socket_timeout": "1m",
      "connect_timeout": "10s"  
    },
    "index": "source_index_name",
    "query": {         
      "match_all": {}
    }
  },
  "dest": {
    "index": "destination_index_name",
    "version_type": "external",
    "op_type": "create"
  }
}

---------------------------------------------------------------------------------------------------------------------------
干貨 | Elasticsearch Reindex性能提升10倍+實戰
1、reindex的速率極慢,是否有辦法改善?
以下問題來自社區:https://elasticsearch.cn/question/3782

問題1:reindex和snapshot的速率極慢,是否有辦法改善?
reindex和snapshot的速率比用filebeat或者kafka到es的寫入速率慢好幾個數量級(集群寫入性能不存在瓶頸),reindex/snapshot的時候CPU還是IO使用率都很低,是不是集群受什么參數限制了reindex和snapshot的速率?
reindex不管是跨集群還是同集群上都很慢,大約3~5M/s的索引速率,會是什么原因導致的?

問題2:數據量幾十個G的場景下,elasticsearch reindex速度太慢,從舊索引導數據到新索引,當前最佳方案是什么?
2、Reindex簡介
5.X版本后新增Reindex。Reindex可以直接在Elasticsearch集群里面對數據進行重建,如果你的mapping因為修改而需要重建,又或者索引設置修改需要重建的時候,借助Reindex可以很方便的異步進行重建,並且支持跨集群間的數據遷移。比如按天創建的索引可以定期重建合並到以月為單位的索引里面去。當然索引里面要啟用_source。

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

3、原因分析
reindex的核心做跨索引、跨集群的數據遷移。
慢的原因及優化思路無非包括:

1)批量大小值可能太小。
需要結合堆內存、線程池調整大小;
2)reindex的底層是scroll實現,借助scroll並行優化方式,提升效率;
3)跨索引、跨集群的核心是寫入數據,考慮寫入優化角度提升效率。
4、Reindex提升遷移效率的方案
4.1 提升批量寫入大小值
默認情況下,_reindex使用1000進行批量操作,您可以在source中調整batch_size。

POST _reindex
{
  "source": {
    "index": "source",
    "size": 5000
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

批量大小設置的依據:

(1)使用批量索引請求以獲得最佳性能。
批量大小取決於數據、分析和集群配置,但一個好的起點是每批處理5-15 MB。
注意,這是物理大小。文檔數量不是度量批量大小的好指標。例如,如果每批索引1000個文檔,:
1)每個1kb的1000個文檔是1mb。
2)每個100kb的1000個文檔是100 MB。
這些是完全不同的體積大小。
(2)逐步遞增文檔容量大小的方式調優。
1)從大約5-15 MB的大容量開始,慢慢增加,直到你看不到性能的提升。然后開始增加批量寫入的並發性(多線程等等)。
2)使用kibana、cerebro或iostat、top和ps等工具監視節點,以查看資源何時開始出現瓶頸。如果您開始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一個資源達到了容量。要么減少並發性,或者提供更多有限的資源(例如從機械硬盤切換到ssd固態硬盤),要么添加更多節點。
4.2 借助scroll的sliced提升寫入效率
Reindex支持Sliced Scroll以並行化重建索引過程。 這種並行化可以提高效率,並提供一種方便的方法將請求分解為更小的部分。

sliced原理(from medcl)
1)用過Scroll接口吧,很慢?如果你數據量很大,用Scroll遍歷數據那確實是接受不了,現在Scroll接口可以並發來進行數據遍歷了。
2)每個Scroll請求,可以分成多個Slice請求,可以理解為切片,各Slice獨立並行,利用Scroll重建或者遍歷要快很多倍。

slicing使用舉例
slicing的設定分為兩種方式:手動設置分片、自動設置分片。
手動設置分片參見官網。
自動設置分片如下:

POST _reindex?slices=5&refresh
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

slices大小設置注意事項:
1)slices大小的設置可以手動指定,或者設置slices設置為auto,auto的含義是:針對單索引,slices大小=分片數;針對多索引,slices=分片的最小值。
2)當slices的數量等於索引中的分片數量時,查詢性能最高效。slices大小大於分片數,非但不會提升效率,反而會增加開銷。
3)如果這個slices數字很大(例如500),建議選擇一個較低的數字,因為過大的slices 會影響性能。

4.3 ES副本數設置為0
如果要進行大量批量導入,請考慮通過設置index.number_of_replicas來禁用副本:0。
主要原因在於:復制文檔時,將整個文檔發送到副本節點,並逐字重復索引過程。 這意味着每個副本都將執行分析,索引和潛在合並過程。
相反,如果您使用零副本進行索引,然后在提取完成時啟用副本,則恢復過程本質上是逐字節的網絡傳輸。 這比復制索引過程更有效。

PUT /my_logs/_settings
{
    "number_of_replicas": 1
}

4.4 增加refresh間隔
如果你的搜索結果不需要接近實時的准確性,考慮先不要急於索引刷新refresh。可以將每個索引的refresh_interval到30s。
如果正在進行大量數據導入,可以通過在導入期間將此值設置為-1來禁用刷新。完成后不要忘記重新啟用它!
設置方法:

PUT /my_logs/_settings
{ "refresh_interval": -1 }

5、小結
實踐證明,比默認設置reindex速度能提升10倍+。
遇到類似問題,多從官網、原理甚至源碼的角度思考,逐步拆解分析。
只要思維不滑坡,辦法總比問題多!

參考:
[1] Jest Reindex參考:http://t.cn/RDOyIc8
[2] 官網性能優化:http://t.cn/RDOyJqr
[3] 論壇討論:http://t.cn/RDOya3a

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM