應用版本
elasticsearch 5.5
spark 2.2.0
hadoop 2.7
依賴包版本
docker cp /Users/cclient/.ivy2/cache/org.elasticsearch/elasticsearch-spark-20_2.11/jars/elasticsearch-spark-20_2.11-6.0.0-alpha2.jar spark:/usr/spark-2.2.0/jars/
問題1
Multiple ES-Hadoop versions detected in the classpath; please use only one
多了其他依賴包 我的環境多引入了elasticsearch-hadoop-cascading-6.0.0-alpha2.jar 刪除即可
問題2
an id must be provided if version type or value are set;
upsert 時必須指定 id
"es.mapping.id"->"id"
問題3
kafka 存儲的是 json 序列化內容,spark 操作中需要反序列化,默認應用的json4s
map(jsonitem=>{
implicit val formats = DefaultFormats
parseJson(jsonitem).extract[ESData]
}
ESData 為 case class 若json 字符串不規范,缺少相應字段,則會報錯,為該字段設默認值即可
case class ESData(bool_isEssence : Option[Boolean]=Some(false),text_title : String)
另
寫入 es 配置官方文檔
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
官方示例
es.resource.write = my-collection/{media_type}
index 類型是固定的,經測,同樣可以自定義
如
es.resource.write ={media_type}/{media_type}
elasticsearch 存儲時根據年月分區
控制信息都保存在源json數據內,spark 寫入時只作反序列化,和index 和 type 映射
