kafka spark steam 寫入elasticsearch的部分問題


應用版本

elasticsearch 5.5

spark 2.2.0

hadoop 2.7

 

依賴包版本

docker cp /Users/cclient/.ivy2/cache/org.elasticsearch/elasticsearch-spark-20_2.11/jars/elasticsearch-spark-20_2.11-6.0.0-alpha2.jar spark:/usr/spark-2.2.0/jars/

 

問題1

Multiple ES-Hadoop versions detected in the classpath; please use only one

多了其他依賴包 我的環境多引入了elasticsearch-hadoop-cascading-6.0.0-alpha2.jar 刪除即可

 

問題2

an id must be provided if version type or value are set;

upsert 時必須指定 id 

"es.mapping.id"->"id"

 

問題3

kafka 存儲的是 json 序列化內容,spark 操作中需要反序列化,默認應用的json4s

map(jsonitem=>{
implicit val formats = DefaultFormats
parseJson(jsonitem).extract[ESData]
}

ESData 為 case class 若json 字符串不規范,缺少相應字段,則會報錯,為該字段設默認值即可

case class ESData(bool_isEssence : Option[Boolean]=Some(false),text_title : String)

 

 

寫入 es 配置官方文檔

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

官方示例  

es.resource.write = my-collection/{media_type}

 index 類型是固定的,經測,同樣可以自定義

如 

es.resource.write ={media_type}/{media_type}

 

elasticsearch 存儲時根據年月分區

 

控制信息都保存在源json數據內,spark 寫入時只作反序列化,和index 和 type 映射


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM