elasticsearch 基礎 —— Update By Query API


Update By Query API

最簡單的用法是_update_by_query在不更改源的情況下對索引中的每個文檔執行更新。這對於獲取新屬性或其他一些在線映射更改很有用 。這是API:

POST twitter/_update_by_query?conflicts=proceed

這將返回如下內容:

{
  "took" : 147,
  "timed_out": false,
  "updated": 120,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query在索引啟動時獲取索引的快照,並使用internal版本控制索引它。這意味着如果文檔在拍攝快照的時間和處理索引請求之間發生更改,則會出現版本沖突。當版本匹配時,文檔會更新,版本號會遞增。

由於internal版本控制不支持將值0作為有效版本號,因此無法使用版本等於零的文檔進行更新, _update_by_query並且將使請求失敗。

所有更新和查詢失敗都會導致_update_by_query中止並failures在響應中返回。已執行的更新仍然存在。換句話說,該過程不會回滾,只會中止。當第一個失敗導致中止時,失敗的批量請求返回的所有失敗都將在failures元素中返回; 因此,可能存在相當多的失敗實體。

如果您只想計算版本沖突,不要導致_update_by_query 中止,您可以conflicts=proceed在URL或"conflicts": "proceed" 請求正文中設置。第一個例子是這樣做的,因為它只是試圖獲取在線映射更改,而版本沖突只是意味着沖突文檔在_update_by_query 嘗試更新文檔的開始和更新之間進行了更新。這很好,因為該更新將獲得在線映射更新。

回到API格式,這將更新twitter索引中的推文:

POST twitter/_doc/_update_by_query?conflicts=proceed

您還可以_update_by_query使用 Query DSL進行限制。這將更新twitter用戶索引中的所有文檔kimchy

POST twitter/_update_by_query?conflicts=proceed
{
  "query": { ①
    "term": {
      "user": "kimchy"
    }
  }
}

  必須query以與Search API相同的方式將查詢作為值傳遞給鍵。您也可以使用q 與搜索API相同的方式使用參數。

到目前為止,我們只是在不更改文檔來源的情況下更新文檔。這對於拾取新房產等事情非常有用, 但這只是其中一半的樂趣。_update_by_query 支持腳本來更新文檔。這將增加likes所有kimchy的推文上的字段:

POST twitter/_update_by_query
{
  "script": {
    "source": "ctx._source.likes++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

就像在Update API中一樣,您可以設置ctx.op更改執行的操作:

noop

設置ctx.op = "noop"腳本是否確定不需要進行任何更改。這將導致_update_by_query從其更新中省略該文檔。這種無操作將noop響應機構的計數器中 報告。

delete

設置ctx.op = "delete"如果你的腳本決定,該文件必須被刪除。刪除將deleted響應正文中的計數器中 報告。

設置ctx.op為其他任何內容都是錯誤的。設置任何其他字段ctx是錯誤的。

請注意,我們已停止指定conflicts=proceed。在這種情況下,我們希望版本沖突中止該過程,以便我們可以處理失敗。

此API不允許您移動它接觸的文檔,只需修改它們的源。這是故意的!我們沒有規定將文檔從原始位置刪除。

也可以同時在多個索引和多個類型上完成這一切,就像搜索API一樣:

POST twitter,blog/_doc,post/_update_by_query

如果您提供,routing則路由將復制到滾動查詢,將進程限制為與該路由值匹配的分片:

POST twitter/_update_by_query?routing=1

默認情況下,_update_by_query使用1000的滾動批次。您可以使用scroll_sizeURL參數更改批量大小:

POST twitter/_update_by_query?scroll_size=100

_update_by_query也可以通過指定如下內容來使用“ 攝取節點”功能pipeline

PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST twitter/_update_by_query?pipeline=set-foo

URL參數

除了標准的參數,如pretty,此更新通過查詢API也支持refreshwait_for_completionwait_for_active_shardstimeout 和scroll

發送refresh將在請求完成時更新正在更新的索引中的所有分片。這與Index API的refresh 參數不同,后者僅導致接收新數據的分片被編入索引。

如果請求包含,wait_for_completion=false則Elasticsearch將執行一些預檢檢查,啟動請求,然后返回task 可與Tasks API 一起使用以取消或獲取任務狀態的請求。Elasticsearch還將創建此任務的記錄作為文檔.tasks/task/${taskId}。這是你的保留或刪除你認為合適。完成后,刪除它,以便Elasticsearch可以回收它使用的空間。

wait_for_active_shards控制在繼續請求之前必須激活碎片的副本數量。詳情請見此處 。timeout控制每個寫入請求等待不可用分片變為可用的時間。兩者都完全適用於 Bulk API中的工作方式。由於_update_by_query采用滾動搜索,你還可以指定scroll參數來控制多長時間保持“搜索上下文”活着,例如?scroll=10m,默認情況下它是5分鍾。

requests_per_second可以被設置為任何正十進制數(1.46, 1000等)和節流速率_update_by_query通過填充每個批次由一等待時間發出索引操作的批次。可以通過設置requests_per_second為禁用限制-1

通過在批處理之間等待來完成限制,以便在_update_by_query內部使用的滾動 可以被賦予考慮填充的超時。填充時間是批量大小除以requests_per_second寫入所花費的時間之間的差異。默認情況下,批處理大小為1000,因此如果requests_per_second設置為500

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - delete_time = 2 seconds - .5 seconds = 1.5 seconds

由於批處理是作為單個_bulk請求發出的,因此大批量大小將導致Elasticsearch創建許多請求,然后等待一段時間再開始下一組。這是“突發”而不是“平滑”。默認是-1

響應正文

JSON響應如下所示:

{
  "took" : 147,
  "timed_out": false,
  "total": 5,
  "updated": 5,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "failures" : [ ]
}

took

整個操作從開始到結束的毫秒數。

timed_out

true如果在查詢執行更新期間執行的任何請求超時 ,則將此標志設置為。

total

已成功處理的文檔數。

updated

已成功更新的文檔數。

deleted

已成功刪除的文檔數。

batches

由查詢更新拉回的滾動響應數。

version_conflicts

按查詢更新的版本沖突數。

noops

由於用於按查詢更新的腳本返回的noop值,因此忽略的文檔數ctx.op

retries

逐個更新嘗試的重試次數。bulk是重試的批量操作search的數量,是重試的搜索操作的數量。

throttled_millis

請求睡眠符合的毫秒數requests_per_second

requests_per_second

在查詢更新期間有效執行的每秒請求數。

throttled_until_millis

在按查詢響應刪除時,此字段應始終等於零。它只在使用Task API時有意義,它指示下一次(自紀元以來的毫秒數),為了符合,將再次執行受限制的請求requests_per_second

failures

如果在此過程中存在任何不可恢復的錯誤,則會出現故障數組。如果這是非空的,那么請求因為那些失敗而中止。逐個查詢是使用批處理實現的,任何故障都會導致整個進程中止,但當前批處理中的所有故障都會被收集到數組中。您可以使用該conflicts選項來防止reindex在版本沖突中中止。

使用Task API

您可以使用Task API獲取所有正在運行的逐個查詢請求的狀態 :

GET _tasks?detailed=true&actions=*byquery

回復如下:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    ①
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            }
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

 該對象包含實際狀態。它就像響應json一樣,重要的是增加了這個total領域。total是reindex期望執行的操作總數。您可以通過添加估計的進展updatedcreated以及deleted多個領域。請求將在其總和等於total字段時結束。

使用任務ID,您可以直接查找任務:

GET /_tasks/task_id

此API的優勢在於它可以集成wait_for_completion=false 以透明地返回已完成任務的狀態。如果任務完成並wait_for_completion=false設置在它上面,它將返回一個 results或一個error字段。此功能的成本是wait_for_completion=false創建的文檔 .tasks/task/${taskId}。您可以刪除該文檔。

使用Cancel Task API

可以使用任務取消API取消任何按查詢更新:

POST _tasks/task_id/_cancel

task_id可以使用上述任務的API被發現。

取消應該很快發生,但可能需要幾秒鍾。上面的任務狀態API將繼續列出任務,直到它被喚醒以取消自身。

Rethrottling

requests_per_second可以使用_rethrottleAPI 通過查詢在運行的更新上更改值:

POST _update_by_query/task_id/_rethrottle?requests_per_second=-1

task_id可以使用上述任務的API被發現。

就像在_update_by_queryAPI 上設置它一樣,requests_per_second 可以-1禁用限制或任何十進制數,如1.712限制到該級別。加速查詢的Rethrottling會立即生效,但是在完成當前批處理后,重新啟動會降低查詢速度。這可以防止滾動超時。

切片

逐個查詢支持切片滾動以並行化更新過程。這種並行化可以提高效率,並提供一種方便的方法將請求分解為更小的部分。

手動切片

通過為每個請求提供切片ID和切片總數,手動切片查詢:

POST twitter/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}
POST twitter/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您可以驗證哪個適用於:

GET _refresh
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

 這樣的結果是明智的total

{
  "hits": {
    "total": 120
  }
}

自動切片

您還可以使用“ 切片滾動”切換為自動並行查詢 _uid。使用slices指定片使用的數字:

POST twitter/_update_by_query?refresh&slices=5
{
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您還可以驗證以下內容:

POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

這樣的結果是明智的total

{
  "hits": {
    "total": 120
  }
}

設置slicesauto將讓Elasticsearch選擇要使用的切片數。此設置將使用每個分片一個切片,達到一定限制。如果有多個源索引,它將根據具有最小分片數的索引選擇切片數。

添加slices_update_by_query剛剛自動化在上面的部分中使用的手工工藝,創建子請求,這意味着它有一些怪癖:

  • 您可以在Tasks API中查看這些請求 。這些子請求是請求任務的“子”任務slices
  • 獲取請求的任務狀態slices僅包含已完成切片的狀態。
  • 這些子請求可單獨尋址,例如取消和重新限制。
  • 對請求進行重新處理slices將按比例重新調整未完成的子請求。
  • 取消請求slices將取消每個子請求。
  • 由於slices每個子請求的性質將無法獲得完全均勻的文檔部分。將解決所有文檔,但某些切片可能比其他文件更大。期望更大的切片具有更均勻的分布。
  • 像請求requests_per_secondsize請求的參數slices 按比例分配給每個子請求。結合上面關於分布不均勻的點,你應該得出結論,使用 sizewith slices可能不會導致size文件確切地為`_update_by_query`。
  • 每個子請求獲得的源索引的略有不同的快照,盡管這些都是在大約相同的時間進行的。

挑選切片數量

如果自動切片,設置slicesauto將為大多數索引選擇合理的數字。如果您手動切片或以其他方式調整自動切片,請使用這些指南。

當數量slices等於索引中的分片數時,查詢性能最有效。如果該數字很大(例如,500),請選擇較小的數字,因為太多slices會損害性能。設置 slices高於分片數通常不會提高效率並增加開銷。

更新性能在可用資源上以切片數量線性擴展。

查詢或更新性能是否主導運行時取決於重新編制索引的文檔和群集資源。

選擇一個新的屬性

假設您創建了一個沒有動態映射的索引,用數據填充它,然后添加了一個映射值以從數據中獲取更多字段:

PUT test
{
  "mappings": {
    "_doc": {
      "dynamic": false,   ①
      "properties": {
        "text": {"type": "text"}
      }
    }
  }
}

POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping/_doc   ②
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

這意味着不會將新字段編入索引,只存儲在其中_source

這會更新映射以添加新flag字段。要獲取新字段,您必須使用它重新索引所有文檔。

搜索數據將找不到任何內容:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 0
  }
}

但您可以發出_update_by_query請求以獲取新映射:

POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 1
  }
}

將字段添加到多字段時,您可以執行完全相同的操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM