在我們開發的過程中,我們有很多時候需要用到Reindex接口。它可以幫我們把數據從一個index到另外一個index進行重新reindex。這個對於特別適用於我們在修改我們數據的mapping后,需要重新把數據從現有的index轉到新的index建立新的索引,這是因為我們不能修改現有的index的mapping一旦已經定下來了。在接下來的介紹中,我們將學習如何使用reindex接口。
為了能夠使用reindex接口,我們必須滿足一下的條件:
_source
選項對所有的源index文檔是啟動的,也即源index的source是被存儲的- reindex不是幫我們嘗試設置好目的地index。它不拷貝源index的設置到目的地的index里去。你應該在做reindex之前把目的地的源的index設置好,這其中包括mapping, shard數目,replica等
下面,我們來一個具體的例子,比如建立一個blogs的index。
PUT twitter2/_doc/1
{
"user" : "雙榆樹-張三",
"message" : "今兒天氣不錯啊,出去轉轉去",
"uid" : 2,
"age" : 20,
"city" : "北京",
"province" : "北京",
"country" : "中國",
"address" : "中國北京市海淀區",
"location" : {
"lat" : "39.970718",
"lon" : "116.325747"
}
}
上面的命令讓我們建立了一個叫做twitter2的index,並同時幫我們生產了一個如下的mapping:
GET /twitter2/_mapping
顯示的結果是:
{
"twitter2" : {
"mappings" : {
"properties" : {
"address" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"age" : {
"type" : "long"
},
"city" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"country" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"location" : {
"properties" : {
"lat" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"lon" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"message" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"province" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"uid" : {
"type" : "long"
},
"user" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
顯然系統幫我們生產的location數據類型是不對的,我們必須進行修改。一種辦法是刪除現有的twitter2索引,讓后修改它的mapping,再重新索引所有的數據。這對於一個兩個文檔還是可以的,但是如果已經有很多的數據了,這個方法並不可取。另外一種方式,是建立一個完全新的index,使用新的mapping進行reindex。下面我們展示如何使用這種方法。
創建一個新的twitter3的index,使用如下的mapping:
PUT twitter3
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"address": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"age": {
"type": "long"
},
"city": {
"type": "text"
},
"country": {
"type": "text"
},
"location": {
"type": "geo_point"
},
"message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"province": {
"type": "text"
},
"uid": {
"type": "long"
},
"user": {
"type": "text"
}
}
}
}
這里我們我們修改了location及其它的一些數據項的數據類型。運行上面的指令,我們就可以創建一個完全新的twitter3的index。我們可以通過如下的命令來進行reindex:
POST _reindex
{
"source": {
"index": "twitter2"
},
"dest": {
"index": "twitter3"
}
}
顯示的結果是:
{
"took" : 52,
"timed_out" : false,
"total" : 1,
"updated" : 0,
"created" : 1,
"deleted" : 0,
"batches" : 1,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0,
"failures" : [ ]
}
我們可以通過如下的命令來檢查我們的twitter3是否已經有新的數據:
GET /twitter3/_search
顯示的結果是:
{
"took" : 100,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "twitter3",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "雙榆樹-張三",
"message" : "今兒天氣不錯啊,出去轉轉去",
"uid" : 2,
"age" : 20,
"city" : "北京",
"province" : "北京",
"country" : "中國",
"address" : "中國北京市海淀區",
"location" : {
"lat" : "39.970718",
"lon" : "116.325747"
}
}
}
]
}
}
顯然我們的數據已經從twitter2到twitter3,並且它的數據類型已經是完全符合我們要求的數據類型。
Reindex執行
- Reindex是一個時間點的副本
- 就像上面返回的結果顯示的那樣,它是以batch(批量)的方式來執行的。默認的批量大小為1000
- 你也可以只拷貝源index其中的一部分數據
- 通過加入query到source中
- 通過定義max_docs參數
比如:
POST _reindex
{
"max_docs": 100,
"source": {
"index": "twitter2",
"query": {
"match": {
"city": "北京"
}
}
},
"dest": {
"index": "twitter3"
}
}
這里,我們定義最多不超過100個文檔,同時,我們只拷貝來自“北京”的twitter記錄。
設置op_type to create將導致_reindex僅在目標索引中創建缺少的文檔。 所有現有文檔都會導致版本沖突,比如:
POST _reindex
{
"source": {
"index": "twitter2"
},
"dest": {
"index": "twitter3",
"op_type": "create"
}
}
如果我們之前已經做過reindex,那么我們可以看到如下的結果:
{
"took": 2,
"timed_out": false,
"total": 1,
"updated": 0,
"created": 0,
"deleted": 0,
"batches": 1,
"version_conflicts": 1,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1,
"throttled_until_millis": 0,
"failures": [
{
"index": "twitter3",
"type": "_doc",
"id": "1",
"cause": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, document already exists (current version [5])",
"index_uuid": "ffz2LNIIQqqDx211R5f4fQ",
"shard": "0",
"index": "twitter3"
},
"status": 409
}
]
}
它表明我們之前的文檔id為1的有版本上的沖突。
默認情況下,版本沖突會中止_reindex進程。 “conflict”請求body參數可用於指示_reindex繼續處理版本沖突的下一個文檔。 請務必注意,其他錯誤類型的處理不受“conflict”參數的影響。 當“conflict”:在請求正文中設置“proceed”時,_reindex進程將繼續發生版本沖突並返回遇到的版本沖突計數:
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
Throttling
重新索引大量文檔可能會使您的群集泛濫甚至崩潰。requests_per_second限制索引操作速率。
POST _reindex?requests_per_second=500
{
"source": {
"index": "blogs",
"size": 500
},
"dest": {
"index": "blogs_fixed"
}
}
運用index別名來進行reindex
我們可以通過如下的方法來實現從一個index到另外一個index的數據轉移:
PUT test
PUT test_2
POST /_aliases
{
"actions" : [
{ "add": { "index": "test_2", "alias": "test" } },
{ "remove_index": { "index": "test" } }
]
}
在上面的例子中,假如我們地添加了一個叫做test的index,而test_2是我們想要的。我們直接可以通過上面的方法吧test中的數據交換到test_2中,並同時把test索引刪除。
從遠處進行reindex
_reindex
也支持從一個遠處的Elasticsearch的服務器進行reindex,它的語法為:
POST _reindex
{
"source": {
"index": "blogs",
"remote": {
"host": "http://remote_cluster_node1:9200",
"username": "USERNAME",
"password": "PASSWORD"
}
},
"dest": {
"index": "blogs"
}
}
這里顯示它從一個在http://remote_cluster_node1:9200的服務器來拷貝文件從一個index到另外一個index。
參考:
【1】https://www.elastic.co/guide/en/elasticsearch/reference/7.3/docs-reindex.html