es數據寫入、刪除和更新


轉載於: https://www.jianshu.com/p/d9e5451456e6

 

數據寫入過程:


 
寫入過程

注:一個數據不會寫入多個主分片


 
底層邏輯

注:數據先寫入Buffer,同時寫入Translog(用於極端情況下的數據恢復),Buffer緩存數據達到閾值會批量刷到磁盤(中間有個文件系統緩沖),所以說es的數據寫入是一個近實時的(存才延時默認是1秒)

單條寫入put/post:

1、put,需要設定數據ID(同一條數據首次插入是created,再次插入會updated)
2、post,可選擇設定數據ID(不指定id情況下:同一條數據首次插入是created,再次插入還是created,但_id會變;指定id若id不變第二次插入失敗)
_doc:同一條數據首次插入是created,再次插入會updated
_create:同一條數據首次插入是created,再次插入會報錯

PUT compyan-starff-001/_doc/1
{
  ...
}
PUT compyan-starff-001/_create/1
{
  ...
}

result,created/updated,新增或更新

req請求參數解讀:

op_type,操作類型
refresh,刷新策略

PUT compyan-starff-001/_doc/1?routing=1 #路由到第一個主分片
{
  ...
}

routing,路由策略
Wait_for_active_shards,等待寫入分片數量

GET company-staff-001/_doc/1  #通過這種方式可以立即獲取到插入的數據,因為這種GET方式會直接
#從Buffer中回去數據(實時)
GET company-staff-001/_search?routing=1  #_search需要路由到指定的分段從磁盤中獲取數據(這種是非實時)
#PUT數據時可以通過&refresh=true強制實時(也是近實時)

批量寫入_bulk:

POST _bulk {index:{"_index":"company-staff-001", "_id": "1"}} ... {index:{"_index":"company-staff-001", "_id": "2"}} ... {index:{"_index":"company-staff-001", "_id": "3"}} ... 

數據刪除:

刪除的路由機制和寫入的路由機制相同


 
刪除內部機制
DELETE company-staff-001/_doc/1
#如插入數據時_version為1,刪除是版本號為_version為2,原因:標記刪除先更新了數據版本發生變化,默認情況編輯刪除的數據能保存24小時

條件刪除:_delete_by_query

POST kibana_sample_data_logs_delete/_delete_by_query?requests_per_second { "slice":{ #手動分片刪除 "id":1, #兩次刪除需要修改id "max":2 #分為兩批次刪除 } "query": { "match_all": {} } } 
POST kibana_sample_data_logs_delete/_delete_by_query?slice=2&scroll_size=1000&requests_per_second=100 #自動分片刪除,最大的片數不能超過自己的分片數 

requests_per_second:每秒刪除多少條數據
注:一般需要加上requests_per_second來控制刪除;若不加,在條件刪除海量數據時,可能執行時間比較長,造成es瞬間io巨大,屬於危險操作
scroll_size:每次從es遍歷多少條數據存儲到Buffer中
requests_per_second:每秒多少條

GET _cat/tasks #查詢所有任務 GET _tasks?detailed=true&actions=*/delete/byquery #查詢上面的刪除任務 

批量刪除:

POST _bulk?refresh=true  #刷新
{"delete":{"_index":"company-staff-001", "_id":"1"}}
{"delete":{"_index":"company-staff-001", "_id":"2"}}
{"delete":{"_index":"company-staff-001", "_id":"3"}}

mysql數據庫批量導入數據到es可以使用logstash,原理就是使用了"_bulk"命令

思考:批量輸出數據,當數據量過大時,不如直接刪除索引,再重新導入數據

更新

1、全量更新

#第二次put會全量更新
POST compyan-starff-001/_doc/1
{
  ...
}

2、局部更新

POST compyan-starff-001/_update/1 { "_doc":{ "companyName":"xx公司" } #重要:false當id為1的記錄不存在時會更新報錯,true當id為1的記錄不存在時會創建索引並插入記錄 "doc_as_upset":true } 

3、腳本更新

只更新記錄中companyName字段內容
POST compyan-starff-001/_update/1?refresh=true { "script":{ "source":""" ctx._source. companyName="aaa公司"; """, "lang":"painless", "params":{} } } 或 POST compyan-starff-001/_update/1?refresh=true { "script":{ "source":""" ctx._source. companyName=params.cName; """, "lang":"painless", "params":{ "cName":"aaa公司" } } } 或 POST compyan-starff-001/_update/1?refresh=true { "upsert":{ "companyName"="aaa公司" } } 

批量更新_bulk:

POST _bulk?refresh=true
{"upadte":{"_index":"compyan-starff-001","_id":"1"}}
{"_doc":{"compaynId":"2020","userId":"1234"}}
{"upadte":{"_index":"compyan-starff-001","_id":"2"}}
{"_doc":{"compaynId":"2021","userId":"12345"}}
{"upadte":{"_index":"compyan-starff-001","_id":"3"}}
{"_doc":{"compaynId":"2022","userId":"123456"}}

"result":"noop"表示跟進前后數據相同,es沒有做操作,(es會先做檢查)
"_version"當更新不成功時,版本號依然會加1,若要指定版本號,可以外部設置,但是版本號必須必目前版本號大,否則報錯

條件更新:

"_update_by_query"和條件刪除類似



作者:奮斗的韭菜汪
鏈接:https://www.jianshu.com/p/d9e5451456e6
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM