Elasticsearch:使用_update_by_query更新文檔


轉載自:
https://blog.csdn.net/UbuntuTouch/article/details/105564270

在很多的情況下,我們我們想更新我們所有的文檔:

  • 添加一個新的field或者是一個字段變成一個multi-field
  • 用一個值更新所有的文檔,或者更新復合查詢條件的所有文檔

在今天的文章中,我們來講一下_update_by_query的這幾個用法。

准備數據

我們來創建一個叫做twitter的索引:

PUT twitter
{
  "mappings": {
    "properties": {
      "DOB": {
        "type": "date"
      },
      "address": {
        "type": "keyword"
      },
      "city": {
        "type": "text"
      },
      "country": {
        "type": "keyword"
      },
      "uid": {
        "type": "long"
      },
      "user": {
        "type": "keyword"
      },
      "province": {
        "type": "keyword"
      },
      "message": {
        "type": "text"
      },
      "location": {
        "type": "geo_point"
      }
    }
  }
}

我們使用如下的bulk API來把數據導入:

POST _bulk
{ "index" : { "_index" : "twitter", "_id": 1} }
{"user":"張三","message":"今兒天氣不錯啊,出去轉轉去","uid":2,"city":"北京","province":"北京","country":"中國","address":"中國北京市海淀區","location":{"lat":"39.970718","lon":"116.325747"}, "DOB":"1980-12-01"}
{ "index" : { "_index" : "twitter", "_id": 2 }}
{"user":"老劉","message":"出發,下一站雲南!","uid":3,"city":"北京","province":"北京","country":"中國","address":"中國北京市東城區台基廠三條3號","location":{"lat":"39.904313","lon":"116.412754"}, "DOB":"1981-12-01"}
{ "index" : { "_index" : "twitter", "_id": 3} }
{"user":"李四","message":"happy birthday!","uid":4,"city":"北京","province":"北京","country":"中國","address":"中國北京市東城區","location":{"lat":"39.893801","lon":"116.408986"}, "DOB":"1982-12-01"}
{ "index" : { "_index" : "twitter", "_id": 4} }
{"user":"老賈","message":"123,gogogo","uid":5,"city":"北京","province":"北京","country":"中國","address":"中國北京市朝陽區建國門","location":{"lat":"39.718256","lon":"116.367910"}, "DOB":"1983-12-01"}
{ "index" : { "_index" : "twitter", "_id": 5} }
{"user":"老王","message":"Happy BirthDay My Friend!","uid":6,"city":"北京","province":"北京","country":"中國","address":"中國北京市朝陽區國貿","location":{"lat":"39.918256","lon":"116.467910"}, "DOB":"1984-12-01"}
{ "index" : { "_index" : "twitter", "_id": 6} }
{"user":"老吳","message":"好友來了都今天我生日,好友來了,什么 birthday happy 就成!","uid":7,"city":"上海","province":"上海","country":"中國","address":"中國上海市閔行區","location":{"lat":"31.175927","lon":"121.383328"}, "DOB":"1985-12-01"}

把一個字段變為multi-field

在上面,我們有意識地把city字段設置為text,但是在實際的應用中city一般來說是keyword類型。比如我們想對city這個字段來進行aggregation。那么我們該如何糾正這個錯誤呢?我們需要把我們之前的index刪除,並使用新的mapping再次重建嗎?這在我們的實際的是使用中可能並不現實。這是因為你的數據可能是非常大的,而且這種改動可能會造成很多的問題。那么我們該如何解決這個問題呢?

一種辦法是在不刪除之前索引的情況下,我們把city變成為一個mulit-field的字段,這樣它既可以是一個keyword的類型,也可以同樣是一個text類型的字段。為此,我們來修改twitter的mapping:

PUT twitter/_mapping
{
  "properties": {
    "DOB": {
      "type": "date"
    },
    "address": {
      "type": "keyword"
    },
    "city": {
      "type": "text",
      "fields": {
        "keyword": {
          "type": "keyword",
          "ignore_above": 256
        }
      }
    },
    "country": {
      "type": "keyword"
    },
    "uid": {
      "type": "long"
    },
    "user": {
      "type": "keyword"
    },
    "province": {
      "type": "keyword"
    },
    "message": {
      "type": "text"
    },
    "location": {
      "type": "geo_point"
    }
  }
}

請注意在上面,我們把message的字段變為一個mult-field的字段。即便我們已經把mapping修改了,但是我們的索引並沒有把我們的message字段進行分詞。為了達到這個目的,我們可以進行如下的操作:

POST twitter/_update_by_query

經過上面的操作后,message字段將會被重新被索引,並可以被我們搜索。

GET twitter/_search
{
  "query": {
    "match": {
      "city.keyword": "北京"
    }
  }
}

上面顯示的結果為:

  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : 0.21357408,
    "hits" : [
      {
        "_index" : "twitter",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 0.21357408,
        "_source" : {
          "user" : "張三",
          "message" : "今兒天氣不錯啊,出去轉轉去",
          "uid" : 2,
          "city" : "北京",
          "province" : "北京",
          "country" : "中國",
          "address" : "中國北京市海淀區",
          "location" : {
            "lat" : "39.970718",
            "lon" : "116.325747"
          },
          "DOB" : "1980-12-01"
        }
      },
   ...
}

當然由於這個字段變為multi-field的字段,它含有city.keyword,我們可以對它進行聚合搜索:

GET twitter/_search
{
  "size": 0,
  "aggs": {
    "city_distribution": {
      "terms": {
        "field": "city.keyword",
        "size": 5
      }
    }
  }
}

上面我們對city進行統計,上面顯示結果為:

  "aggregations" : {
    "city_distribution" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "北京",
          "doc_count" : 5
        },
        {
          "key" : "上海",
          "doc_count" : 1
        }
      ]
    }
  }

如果我們不修改city為multi-field,我們將不能對這個字段進行統計了。

增加一個新的字段

同樣我們可以通過script的方法來為我們的twitter增加一個新的字段,比如:

POST twitter/_update_by_query
{
  "script": {
    "source": "ctx._source['contact'] = \"139111111111\""
  }
}

通過上面的方法,我們把所有的文檔都添加一個新的字段contact,並賦予它一個同樣的值:

GET twitter/_search
{
  "query": {
    "match_all": {}
  }
}

上面的命令顯示結果:

  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "twitter",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "uid" : 2,
          "country" : "中國",
          "address" : "中國北京市海淀區",
          "province" : "北京",
          "city" : "北京",
          "DOB" : "1980-12-01",
          "contact" : "139111111111",
          "location" : {
            "lon" : "116.325747",
            "lat" : "39.970718"
          },
          "message" : "今兒天氣不錯啊,出去轉轉去",
          "user" : "張三"
        }
      },
  ...
}

從上面我們可以看出來,有增加一個新的字段contact。

修改已有的字段

假如我們想對所有在北京的文檔里的uid都加1,那么我么有通過如下的方法:

POST twitter/_update_by_query
{
  "query": {
    "match": {
      "city.keyword": "北京"
    }
  },
  "script": {
    "source": "ctx._source['uid'] += params['one']",
    "params": {
      "one": 1
    }
  }
}

在執行上面的命令后,我們進行查詢:

GET twitter/_search
{
  "query": {
    "match": {
      "city.keyword": "北京"
    }
  }
}

顯示結果:

  "hits" : {
    "total" : {
      "value" : 5,
      "relation" : "eq"
    },
    "max_score" : 0.24116206,
    "hits" : [
      {
        "_index" : "twitter",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 0.24116206,
        "_source" : {
          "uid" : 3,
          "country" : "中國",
          "address" : "中國北京市海淀區",
          "province" : "北京",
          "city" : "北京",
          "DOB" : "1980-12-01",
          "contact" : "139111111111",
          "location" : {
            "lon" : "116.325747",
            "lat" : "39.970718"
          },
          "message" : "今兒天氣不錯啊,出去轉轉去",
          "user" : "張三"
        }
      },
   ...
}

上面顯示city為北京的所有的文檔的uid的數值都被加1了。上面_id為1的原來的uid值為2,現在變為3。

沒有動態mapping時,reindex索引

假設您創建了一個沒有動態mapping的索引,將其填充了數據,然后添加了一個mapping值以從數據中獲取更多字段:

PUT test
{
  "mappings": {
    "dynamic": false,   
    "properties": {
      "text": {"type": "text"}
    }
  }
}
 
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
 
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "foo"
}
 
PUT test/_mapping   
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

在上面我們創建一個叫做test的索引。首先它的動態mapping被禁止了,也就是在索引時凡是不在mapping定義的字段將被自動識別,它們僅僅存在於source里,我們不能對它進行搜索。為了糾正這個錯誤,我們在上面的最后一步嘗試來修改它的mapping來解決這個問題。那么在新的mapping下,我們之前導入的文檔能進行搜索嗎?我們嘗試如下的命令:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}

我們嘗試搜索所有flag中含有foo的文檔,但是上面的返回結果是:

{
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    }
  }
}

那么問題出現在哪里呢?其實在我們修改完mapping以后,我們沒有更新我們之前已經導入的文檔。我們需要使用_update_by_query來做類似reindex的工作。我們使用如下的命令:

POST test/_update_by_query?refresh&conflicts=proceed

我們重新來搜索我們的文檔:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}

上面的查詢顯示的結果是:

{
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    }
  }
}

顯然,在運行完_update_by_query后,我們可以找到我們的文檔了。

針對大量數據的reindex

上面所有的_update_by_query針對少量的數據還是很不錯的。但是在我們的實際應用中,我們可能遇到很大的數據量,那么萬一在reindex的過程中發生意外,那我們還需要從頭開始嗎?或者我們已經處理過的數據還需要再做一遍嗎?一種通用的解決辦法就是在我們的mapping中定義一個字段,比如叫做reindexBatch,那么我們可以通過添加這個字段來跟蹤我們的進度:

POST blogs_fixed/_update_by_query
{
  "query": {
    "range": {
      "flag": {
        "lt": 1
      }
    }
  },
  "script": {
    "source": "ctx._source['flag']=1"
  }
}

即使在reindex的過程已經失敗了,我們再次運行上面的_update_by_query時,之前已經處理過的文件將不再被處理了。

_update_by_query 除了上面的用法之外,我們也可以結合pipepline來對我們的索引數據進行加工。詳細的用法請參閱我之前的文章“運用Elastic Stack分析COVID-19數據並進行可視化分析”。

更多閱讀Elasticsearch: Reindex接口


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM