Flink學習(十一) Sink到Elasticsearch


導入依賴

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
package com.wyh.streamingApi.sink import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests //溫度傳感器讀數樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double) object Sink2ES { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //Source操作
    val inputStream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt") //Transform操作
    val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("192.168.230.30", 9200)) //創建一個ES Sink的builder
    val esSinkBuilder: ElasticsearchSink.Builder[SensorReading] = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("saving data:" + t) //包裝成一個Map或者JsonObject格式
          val json = new util.HashMap[String, String]() json.put("sensor_id", t.id) json.put("timestamp", t.timestamp.toString) json.put("temperature", t.temperature.toString) //創建indexRequest准備發送數據
          val indexRequest = Requests.indexRequest() .index("sensor") .`type`("readingdata") .source(json) //利用requestIndexer進行發送請求,寫入數據
 requestIndexer.add(indexRequest) println("data 寫入完成。。。") } } ) //Sink操作
 dataStream.addSink(esSinkBuilder.build()) env.execute("sink ES test") } }

啟動ES

 

 啟動kibana

 

 運行

 

 查看結果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM