本文主要介紹spark sql讀寫es、structured streaming寫入es以及一些參數的配置
ES官方提供了對spark的支持,可以直接通過spark讀寫es,具體可以參考ES Spark Support文檔(文末有地址)。
以下是pom依賴,具體版本可以根據自己的es和spark版本進行選擇:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.0.0</version>
</dependency>
Spark SQL - ES
主要提供了兩種讀寫方式:一種是通過DataFrameReader/Writer傳入ES Source實現;另一種是直接讀寫DataFrame實現。在實現前,還要列一些相關的配置:
配置
參數 | 描述 |
---|---|
es.nodes.wan.only | true or false,在此模式下,連接器禁用發現,並且所有操作通過聲明的es.nodes連接 |
es.nodes | ES節點 |
es.port | ES端口 |
es.index.auto.create | true or false,是否自動創建index |
es.resource | 資源路徑 |
es.mapping.id | es會為每個文檔分配一個全局id。如果不指定此參數將隨機生成;如果指定的話按指定的來 |
es.batch.size.bytes | es批量API的批量寫入的大小(以字節為單位) |
es.batch.write.refresh | 批量更新完成后是否調用索引刷新 |
es.read.field.as.array.include | 讀es的時候,指定將哪些字段作為數組類型 |
列了一些常用的配置,更多配置查看ES Spark Configuration文檔
DataFrameReader讀ES
import org.elasticsearch.spark.sql._
val options = Map(
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200",
"es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
.read
.format("es")
.options(options)
.load("index1/info")
df.show()
DataFrameWriter寫ES
import org.elasticsearch.spark.sql._
val options = Map(
"es.index.auto.create" -> "true",
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200",
"es.mapping.id" -> "id"
)
val sourceDF = spark.table("hive_table")
sourceDF
.write
.format("org.elasticsearch.spark.sql")
.options(options)
.mode(SaveMode.Append)
.save("hive_table/docs")
讀DataFrame
jar包中提供了esDF()方法可以直接讀es數據為DataFrame,以下是源碼截圖。
簡單說一下各個參數:
resource:資源路徑,例如hive_table/docs
cfg:一些es的配置,和上面代碼中的options差不多
query:指定DSL查詢語句來過濾要讀的數據,例如"?q=user_group_id:3"表示讀user_group_id為3的數據
val options = Map(
"pushdown" -> "true",
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200"
)
val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()
寫DataFrame
jar包中提供了saveToEs()方法可以將DataFrame寫入ES,以下是源碼截圖。
resource:資源路徑,例如hive_table/docs
cfg:一些es的配置,和上面代碼中的options差不多
import org.elasticsearch.spark.sql._
val options = Map(
"es.index.auto.create" -> "true",
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200",
"es.mapping.id" -> "zip_record_id"
)
val df = spark.table("hive_table")
df.saveToEs("hive_table/docs", options)
Structured Streaming - ES
es也提供了對Structured Streaming的集成,使用Structured Streaming可以實時的寫入ES。
import org.elasticsearch.spark.sql._
val options = Map(
"es.index.auto.create" -> "true",
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200",
"es.mapping.id" -> "zip_record_id"
)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "a:9092,b:9092,c:9092")
.option("subscribe", "test")
.option("failOnDataLoss", "false")
.load()
df
.writeStream
.outputMode(OutputMode.Append())
.format("es")
.option("checkpointLocation", s"hdfs://hadoop:8020/checkpoint/test01")
.options(options)
.start("test_streaming/docs")
.awaitTermination()
可能遇到的問題
數組類型轉換錯誤
報錯信息:type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
因為es的mapping只會記錄字段的類型,不會記錄是否是數組,也就是說如果是int數組,es的mapping只是記錄成int。
可以在option中加一個es.read.field.as.array.include,標明數組字段
es.read.field.as.array.include" -> "數組字段的名字"
如果是object里的某個字段,寫成"object名字.數組字段名字",如果是多個字段,字段名之間用逗號分隔
Timestamp被轉為Long
DataFrame的Timestamp類型數據寫入ES后,就變成了Number類型。
這可能不算個問題,時間戳本質上就是Long類型的毫秒值;但是在Hive中Timestamp是"yyyy-MM-dd HH:mm:ss"的類型,個人覺得很別扭。
嘗試將Timestamp類型字段轉成Date類型,寫入ES后還是Number類型。網上搜了一圈也沒有什么好的辦法,大家有什么解決辦法歡迎交流。
References
ES Spark Support文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark
ES Spark Configuration: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
end.
個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。