Elasticsearch: Reindex接口


在我們開發的過程中,我們有很多時候需要用到Reindex接口。它可以幫我們把數據從一個index到另外一個index進行重新reindex。這個對於特別適用於我們在修改我們數據的mapping后,需要重新把數據從現有的index轉到新的index建立新的索引,這是因為我們不能修改現有的index的mapping一旦已經定下來了。在接下來的介紹中,我們將學習如何使用reindex接口。

為了能夠使用reindex接口,我們必須滿足一下的條件:

  • _source選項對所有的源index文檔是啟動的,也即源index的source是被存儲的
  • reindex不是幫我們嘗試設置好目的地index。它不拷貝源index的設置到目的地的index里去。你應該在做reindex之前把目的地的源的index設置好,這其中包括mapping, shard數目,replica等

下面,我們來一個具體的例子,比如建立一個blogs的index。

    PUT twitter2/_doc/1
    {
      "user" : "雙榆樹-張三",
      "message" : "今兒天氣不錯啊,出去轉轉去",
      "uid" : 2,
      "age" : 20,
      "city" : "北京",
      "province" : "北京",
      "country" : "中國",
      "address" : "中國北京市海淀區",
      "location" : {
        "lat" : "39.970718",
        "lon" : "116.325747"
      }
    }

上面的命令讓我們建立了一個叫做twitter2的index,並同時幫我們生產了一個如下的mapping:

GET /twitter2/_mapping

顯示的結果是:

    {
      "twitter2" : {
        "mappings" : {
          "properties" : {
            "address" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "age" : {
              "type" : "long"
            },
            "city" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "country" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "location" : {
              "properties" : {
                "lat" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                },
                "lon" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword",
                      "ignore_above" : 256
                    }
                  }
                }
              }
            },
            "message" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "province" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "uid" : {
              "type" : "long"
            },
            "user" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        }
      }
    }

顯然系統幫我們生產的location數據類型是不對的,我們必須進行修改。一種辦法是刪除現有的twitter2索引,讓后修改它的mapping,再重新索引所有的數據。這對於一個兩個文檔還是可以的,但是如果已經有很多的數據了,這個方法並不可取。另外一種方式,是建立一個完全新的index,使用新的mapping進行reindex。下面我們展示如何使用這種方法。

創建一個新的twitter3的index,使用如下的mapping:

    PUT twitter3
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "address": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "age": {
            "type": "long"
          },
          "city": {
            "type": "text"
          },
          "country": {
            "type": "text"
          },
          "location": {
            "type": "geo_point"
          },
          "message": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "province": {
            "type": "text"
          },
          "uid": {
            "type": "long"
          },
          "user": {
            "type": "text"
          }
        }
      }
    }

這里我們我們修改了location及其它的一些數據項的數據類型。運行上面的指令,我們就可以創建一個完全新的twitter3的index。我們可以通過如下的命令來進行reindex:

    POST _reindex
    {
      "source": {
        "index": "twitter2"
      },
      "dest": {
        "index": "twitter3"
      }
    }

顯示的結果是:

    {
      "took" : 52,
      "timed_out" : false,
      "total" : 1,
      "updated" : 0,
      "created" : 1,
      "deleted" : 0,
      "batches" : 1,
      "version_conflicts" : 0,
      "noops" : 0,
      "retries" : {
        "bulk" : 0,
        "search" : 0
      },
      "throttled_millis" : 0,
      "requests_per_second" : -1.0,
      "throttled_until_millis" : 0,
      "failures" : [ ]
    }

我們可以通過如下的命令來檢查我們的twitter3是否已經有新的數據:

GET /twitter3/_search

顯示的結果是:

    {
      "took" : 100,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "twitter3",
            "_type" : "_doc",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "user" : "雙榆樹-張三",
              "message" : "今兒天氣不錯啊,出去轉轉去",
              "uid" : 2,
              "age" : 20,
              "city" : "北京",
              "province" : "北京",
              "country" : "中國",
              "address" : "中國北京市海淀區",
              "location" : {
                "lat" : "39.970718",
                "lon" : "116.325747"
              }
            }
          }
        ]
      }
    }

顯然我們的數據已經從twitter2到twitter3,並且它的數據類型已經是完全符合我們要求的數據類型。

Reindex執行

  • Reindex是一個時間點的副本
  • 就像上面返回的結果顯示的那樣,它是以batch(批量)的方式來執行的。默認的批量大小為1000
  • 你也可以只拷貝源index其中的一部分數據
    • 通過加入query到source中
    • 通過定義max_docs參數

比如:

    POST _reindex
    {
      "max_docs": 100,
      "source": {
        "index": "twitter2",
        "query": {
          "match": {
            "city": "北京"
          }
        }
      },
      "dest": {
        "index": "twitter3"
      }
    }

這里,我們定義最多不超過100個文檔,同時,我們只拷貝來自“北京”的twitter記錄。

設置op_type to create將導致_reindex僅在目標索引中創建缺少的文檔。 所有現有文檔都會導致版本沖突,比如:

    POST _reindex
    {
      "source": {
        "index": "twitter2"
      },
      "dest": {
        "index": "twitter3",
        "op_type": "create"
      }
    }

如果我們之前已經做過reindex,那么我們可以看到如下的結果:

    {
      "took": 2,
      "timed_out": false,
      "total": 1,
      "updated": 0,
      "created": 0,
      "deleted": 0,
      "batches": 1,
      "version_conflicts": 1,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1,
      "throttled_until_millis": 0,
      "failures": [
        {
          "index": "twitter3",
          "type": "_doc",
          "id": "1",
          "cause": {
            "type": "version_conflict_engine_exception",
            "reason": "[1]: version conflict, document already exists (current version [5])",
            "index_uuid": "ffz2LNIIQqqDx211R5f4fQ",
            "shard": "0",
            "index": "twitter3"
          },
          "status": 409
        }
      ]
    }

它表明我們之前的文檔id為1的有版本上的沖突。

默認情況下,版本沖突會中止_reindex進程。 “conflict”請求body參數可用於指示_reindex繼續處理版本沖突的下一個文檔。 請務必注意,其他錯誤類型的處理不受“conflict”參數的影響。 當“conflict”:在請求正文中設置“proceed”時,_reindex進程將繼續發生版本沖突並返回遇到的版本沖突計數:

    POST _reindex
    {
      "conflicts": "proceed",
      "source": {
        "index": "twitter"
      },
      "dest": {
        "index": "new_twitter",
        "op_type": "create"
      }
    }

Throttling

重新索引大量文檔可能會使您的群集泛濫甚至崩潰。requests_per_second限制索引操作速率。

    POST _reindex?requests_per_second=500 
    {
      "source": {
        "index": "blogs",
        "size": 500
      },
      "dest": {
        "index": "blogs_fixed"
      }
    }

運用index別名來進行reindex

我們可以通過如下的方法來實現從一個index到另外一個index的數據轉移:

    PUT test     
    PUT test_2   
    POST /_aliases
    {
        "actions" : [
            { "add":  { "index": "test_2", "alias": "test" } },
            { "remove_index": { "index": "test" } }  
        ]
    }

在上面的例子中,假如我們地添加了一個叫做test的index,而test_2是我們想要的。我們直接可以通過上面的方法吧test中的數據交換到test_2中,並同時把test索引刪除。

從遠處進行reindex

_reindex也支持從一個遠處的Elasticsearch的服務器進行reindex,它的語法為:

    POST _reindex
    {
      "source": {
        "index": "blogs",
        "remote": {
          "host": "http://remote_cluster_node1:9200",
          "username": "USERNAME",
          "password": "PASSWORD"
        }
      },
      "dest": {
        "index": "blogs"
      }
    }

這里顯示它從一個在http://remote_cluster_node1:9200的服務器來拷貝文件從一個index到另外一個index。

參考:
【1】https://www.elastic.co/guide/en/elasticsearch/reference/7.3/docs-reindex.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM