以elasticsearch-hadoop 向elasticsearch 導數,丟失數據的問題排查


實際這是很久之前的問題了,當時沒時間記錄

這里簡單回顧

 

項目基於

 

數據架構不方便說太細,最精簡的

 

somedata-> [kafka]->spark-stream->elasticsearch

 

在 spark-streaming 引用了elasticsearch-hadoop(實際用的是為支持upsert doc自已打包的,見elasticsearch-hadoop 擴展定制 官方包以支持 update upsert doc)

 

問題是somedata定入kafka 200w條,最后到elasticsearch 190w條,有10w條不見了,也不報任何錯誤,批處理任務都是成功的。

 

首先排入kafka的消費問題,基於kafka自已實現了一套offset偏移維護的機制,不可能在消費kafka這一步丟數

 

唯一可能的就是 elasticsearch-hadoop 寫 elasticsearch 這一步了

 

class SparkDStreamFunctions(ds: DStream[_]) extends Serializable {
    def saveToEs(resource: String): Unit = { EsSparkStreaming.saveToEs(ds, resource) }
    def saveToEs(resource: String, cfg: Map[String, String]): Unit = { EsSparkStreaming.saveToEs(ds, resource, cfg) }
    def saveToEs(cfg: Map[String, String]): Unit = { EsSparkStreaming.saveToEs(ds, cfg) }
  }

寫入es調用包內的saveToEs方法,scala Unit 類似java 的void 這個方法是無返回值的。這里看不出什么線索

 

隱約能感覺到問題在哪里。

 

elasticsearch 是以樂觀鎖,版本號來實現基本的事務控制

 

操作elasticsearch時,相信大部分人都遇到過版本沖突的問題,報錯類似

{
  "error" : "VersionConflictEngineException[[website][2] [blog][1]:
             version conflict, current [2], provided [1]]",
  "status" : 409
}

 

但saveToEs這個方法是沒有返回值的????也就是說能保證不會碰到這個錯誤?

 

當然不是,查看源碼后發現

 

saveToEs無返回值,不代表就這批數據就完全成功了

 

實際會打印錯誤日志,不過只是在對這個包開啟debug后才會打印,默認的情況下是不開的。包的開發者們認為這種版本沖突的錯,如果拋到頂層,讓整個任務失敗太小題大作了,因此也不會往外拋,只會對比較"大"的異常才會拋到頂層。

 

實際上 elasticsearch-hadoop 會在一批任務寫入失敗后,隔一段時間重試,重試幾次后,直接跳過這組數據,這數據等於就丟棄了。(代碼就不貼了,因為github上最新的代碼和我當時排查時不一樣,可能有變化,問題已經解決,這次回顧也沒精力細究了,如果貼錯了還誤人子弟)

 

官方的配置文檔

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

es.batch.write.retry.count (default 3)
Number of retries for a given batch in case Elasticsearch is overloaded and data is rejected. Note that only the rejected data is retried. If there is still data rejected after the retries have been performed, the Hadoop job is cancelled (and fails). A negative value indicates infinite retries; be careful in setting this value as it can have unwanted side effects.
es.batch.write.retry.wait (default 10s)
Time to wait between batch write retries that are caused by bulk rejections.

 

這兩個參數就是重試相關的配置。

 

加了這3個參數后,就解決丟數的問題

 

"es.batch.write.retry.count" -> "-1",
"es.batch.write.retry.wait" -> "60s",
"es.batch.size.entries" -> "50"

 

es.batch.write.retry.count 表示無限重試,這個得謹慎着用最主要是改這個,我手里這套系統正好可以這么用。

es.batch.write.retry.wait 重試間隔由默認的10s改為60s,這個只是優化的

 

es.batch.size.entries也是優化的

es.batch.size.entries (default 1000)
Size (in entries) for batch writes using Elasticsearch bulk API - (0 disables it). Companion to es.batch.size.bytes, once one matches, the batch update is executed. Similar to the size, this setting is per task instance; it gets multiplied at runtime by the total number of Hadoop tasks running.

 

elasticsearch集群本身不提供權限控制,大部分架構都會在之前加個nginx

 

如果單個文檔都很大的話,默認的1000個,可能會超過nginx 限制的單獨http的body大小,nginx直接就讓請求失敗了,把這個數改小,是為了避免這種情況。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM