背景
眾所周知,Elasticsearch是⼀個實時的分布式搜索引擎,為⽤戶提供搜索服務。當我們決定存儲某種數據,在創建索引的時候就需要將數據結構,即Mapping確定下來,於此同時索引的設定和很多固定配置將不能改變。
那如果后續業務發生變化,需要改變數據結構或者更換ES更換分詞器怎么辦呢?為此,Elastic團隊提供了很多通過輔助⼯具來幫助開發⼈員進⾏重建索引的方案。
如果對 reindex
API 不熟悉,那么在遇到重構的時候,必然事倍功半,效率低下。反之,就可以方便地進行索引重構,省時省力。
步驟
假設之前我們已經存在一個blog索引,因為更換分詞器需要對該索引中的數據進行重建索引,以便支持業務使用新的分詞規則搜索數據,並且盡可能使這個變化對外服務沒有感知,大概分為以下幾個步驟:
- 新增⼀個索引
blog_lastest
,Mapping數據結構與blog
索引一致 - 將
blog
數據同步至blog_lastest
- 刪除
blog
索引 - 數據同步后給
blog_lastest
添加別名blog
新建索引
在這里推薦一個ES管理工具Kibana,主要針對數據的探索、可視化和分析。
put /blog_lastest/
{
"mappings":{
"properties":{
"title":{
"type":"text",
"analyzer":"ik_max_word"
},
"author":{
"type":"keyword",
"fields":{
"seg":{
"type":"text",
"analyzer":"ik_max_word"
}
}
}
}
}
}
將舊索引數據copy到新索引
同步等待
接⼝將會在 reindex 結束后返回
POST /_reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest"
}
}
在 kibana
中的使用如下所示
當然高版本(7.1.1)中,ES都有提供對應的Java REST Client
,比如
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
為了防止贅述,接下來舉例全部以kibana
中請求介紹,如果有需要用Java REST Client
,可以自行去ES官網查看。
異步執⾏
如果 reindex 時間過⻓,建議加上 wait_for_completion=false
的參數條件,這樣 reindex 將直接返回 taskId
。
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest"
}
}
返回:
{
"task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}
op_type 參數
op_type
參數控制着寫入數據的沖突處理方式,如果把 op_type
設置為 create
【默認值】,在 _reindex
API 中,表示寫入時只在 dest
index
中添加不存在的 doucment,如果相同的 document 已經存在,則會報 version confilct
的錯誤,那么索引操作就會失敗。【這種方式與使用 _create API 時效果一致】
POST _reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
}
}
如果這樣設置了,也就不存在更新數據的場景了【沖突數據無法寫入】,我們也可以把 op_type
設置為 index
,表示所有的數據全部重新索引創建。
conflicts 配置
默認情況下,當發生 version conflict
的時候,_reindex
會被 abort
,任務終止【此時數據還沒有 reindex
完成】,在返回體中的 failures
指標中會包含沖突的數據【有時候數據會非常多】,除非把 conflicts
設置為 proceed
。
關於 abort
的說明,如果產生了 abort
,已經執行的數據【例如更新寫入的】仍然存在於目標索引,此時任務終止,還會有數據沒有被執行,也就是漏數了。換句話說,該執行過程不會回滾,只會終止。如果設置了 proceed
,任務在檢測到數據沖突的情況下,不會終止,會跳過沖突數據繼續執行,直到所有數據執行完成,此時不會漏掉正常的數據,只會漏掉有沖突的數據。
POST _reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
我們可以故意把 op_type
設置為 create
,人為制造數據沖突的場景,測試時更容易觀察到沖突現象。
如果把 conflicts
設置為 proceed
,在返回體結果中不會再出現 failures
的信息,但是通過 version_conflicts
指標可以看到具體的數量。
批次大小配置
當你發現reindex
的速度有些慢的時候,可以在 query
參數的同一層次【即 source
參數中】添加 size
參數,表示 scroll size
的大小【會影響批次的次數,進而影響整體的速度】,如果不顯式設置,默認是一批 1000 條數據,在一開始的簡單示例中也看到了。
如下,設置 scroll size
為 5000:
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog",
"size":5000
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
測試后,速度達到了 30 分鍾 500 萬左右,明顯提升了很多。
根據taskId可以實時查看任務的執行狀態
一般來說,如果我們的 source index
很大【比如幾百萬數據量】,則可能需要比較長的時間來完成 _reindex
的工作,可能需要幾十分鍾。而在此期間不可能一直等待結果返回,可以去做其它事情,如果中途需要查看進度,可以通過 _tasks
API 進行查看。
GET /_tasks/{taskId}
返回:
{
"completed" : false,
"task" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 4704218,
"type" : "transport",
"action" : "indices:data/write/reindex",
……
}
當執行完畢時,completed
為true
查看任務進度以及取消任務,除了根據taskId查看以外,我們還可以通過查看所有的任務中篩選本次reindex
的任務。
GET _tasks?detailed=true&actions=*reindex
返回結果:
{
"nodes" : {
"dpBihNSMQfSlboMGlTgCBA" : {
"name" : "node-16111-9210",
"transport_address" : "192.168.XXX.XXX:9310",
"host" : "192.168.XXX.XXX",
"ip" : "192.168.16.111:9310",
"roles" : [
"ingest",
"master"
],
"attributes" : {
"xpack.installed" : "true",
"transform.node" : "false"
},
"tasks" : {
"dpBihNSMQfSlboMGlTgCBA:6629305" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 6629305,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"total" : 8361421,
"updated" : 0,
"created" : 254006,
"deleted" : 0,
"batches" : 743,
"version_conflicts" : 3455994,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0
},
"description" : "reindex from [blog] to [blog_lastest][_doc]",
"start_time_in_millis" : 1609338953464,
"running_time_in_nanos" : 1276738396689,
"cancellable" : true,
"headers" : { }
}
}
}
}
}
注意觀察里面的幾個重要指標,例如從 description
中可以看到任務描述,從 tasks 中可以找到任務的 id
【例如 dpBihNSMQfSlboMGlTgCBA:6629305
】,從 cancellable
可以判斷任務是否支持取消操作。
這個 API 其實就是模糊匹配,同理也可以查詢其它類型的任務信息,例如使用 GET _tasks?detailed=true&actions=*byquery
查看查詢請求的狀態。
當集群的任務太多時我們就可以根據task_id
,也就是上面提到GET /_tasks/task_id
方式更加准確地查詢指定任務的狀態,避免集群的任務過多,不方便查看。
如果遇到操作失誤的場景,想取消任務,有沒有辦法呢?
當然有啦,雖然覆水難收,通過調用
_tasks API
:
POST _tasks/task_id/_cancel
這里的 task_id
就是通過上面的查詢任務接口獲取的任務id(任務要支持取消操作,即【cancellable 為 true】時方能收效)。
刪除舊索引
當我們通過 API 查詢發現任務完成后,就可以進行后續操作,我這里是要刪除舊索引,然后再給新索引起別名,用於替換舊索引,這樣才能保證對外服務沒有任何感知。
DELETE /blog
使用別名
POST /_aliases
{
"actions":[
{
"add":{
"index":"blog_lastest",
"alias":"blog"
}
}
]
}
通過別名訪問新索引
進行過以上操作后,我們可以使用一個簡單的搜索驗證服務。
POST /blog/_search
{
"query": {
"match": {
"author": "james"
}
}
}
如果搜索結果達到我們的預期目標,至此,數據索引重建遷移完成。
本文可轉載,但需聲明原文出處。 程序員小明,一個很少加班的程序員。歡迎關注微信公眾號“程序員小明”,獲取更多優質文章。