增
新建一個 dataframe
,插入到索引 _index/_type
,直接調用 saveToEs
,讓 _id
為自己設定的 id
:
import org.elasticsearch.spark.sql._
def main(args: Array[String]): Unit = {
val spark = getSparkSession()
val dataFrame = spark.createDataFrame(Seq(
(1, 1, "2", "5"),
(2, 2, "3", "6"),
(3, 2, "36", "69")
)).toDF("id", "label", "col1", "col2")
dataFrame.saveToEs("_index/_type",Map("es.mapping.id" -> "id"))
}
//配置spark
def getSparkSession(): SparkSession = {
val masterUrl = "local"
val appName = "ttyb"
val sparkconf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
.set("es.nodes", "es的IP")
.set("es.port", "9200")
val Spark = SparkSession.builder().config(sparkconf).getOrCreate()
Spark
}
刪
目前 spark
沒有開放刪除的 API
,所以刪除只能用命令行:
curl -XDELETE 'http://es的IP:9200/_index/_type/_id'
查
根據時間范圍查詢,其中 query
可以為空,代表不以任何查詢條件查詢:
val startTime = "1519660800000"
val endTime = "1519747200000"
val query = "{\"query\":{\"range\":{\"recordtime\":{\"gte\":" + startTime + ",\"lte\":" + endTime + "}}}}"
val tableName = "_index/_type"
val botResultData = spark.esDF(tableName, query)
改
例如需要將 id=3
的 col1
改成 4
,col2
改成 7
,可以新建一個 dataframe
,按照 id
儲存,這樣 elasticsearch
就會自動覆蓋相同 id
下的數據:
val dataFrame1 = spark.createDataFrame(Seq(
(3, 2, "4", "7")
)).toDF("id", "label", "col1", "col2")
dataFrame1.saveToEs("_index/_type",Map("es.mapping.id" -> "id"))