Flink之Sink(文件、Kafka、Redis、Es、Mysql)


提醒:

1、連接kafka,一定要注意依賴版本,否則即使程序啟動沒有報錯,也不會接受到數據
2、kafka依賴除了Flink和kafka的,建議加上kafka-client的依賴,對應版本也需一致
-----------------------------------------------------------------------
1、連接es,需要使用flink-connector-elasticsearch6_2.11、elasticsearch-rest-high-level-client、elasticsearch-rest-client、elasticsearch,否則無法啟動

數據格式

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

##########保存至文件##############

1、處理主類

package sink

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
/**
 * @author yangwj
 * @date 2021/1/6 21:17
 * @version 1.0
 */
object FileSink {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })
    //1、過時方法
    val savePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensorToFile"
    dataStream.writeAsCsv(savePath)

    //2、分布式方法
    val saveDistributePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\saveDistributePath"
    dataStream.addSink(StreamingFileSink.forRowFormat(
      new Path(saveDistributePath),
      new SimpleStringEncoder[SensorReading]())
      .build()
    )
    env.execute("FileSink Test")

  }

}

##########保存至Es##############

1、依賴

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.6.0</version>
        </dependency>

2、處理主類

package sink

import java.util

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
/**
 * @author yangwj
 * @date 2021/1/6 22:05
 * @version 1.0
 */
object EsSink {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost",9200))

    //自定義寫入Es的function
    val myEsSinkFunc: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] {

      //process 數據處理方法,並發送至Es
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        //包裝一個Map作為data source
        val dataSource = new util.HashMap[String, String]()
        dataSource.put("id",t.id)
        dataSource.put("temperature",t.temperature.toString)
        dataSource.put("ts",t.timestamp.toString)

        //創建indexRequest,用於發送http請求
        val request: IndexRequest = Requests.indexRequest().index("sensor").`type`("_doc").source(dataSource)

        requestIndexer.add(request)
      }

    }
esLink.setBulkFlushMaxActions(1)  //表示將buffer中的數據sink到es
dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts,myEsSinkFunc).build()) 
env.execute(
"Es Sink Test") } }

##########保存至Kafka##############

1、依賴(注意:一定要注意版本的問題,否則程序啟動沒有錯誤,也接受不到kafka的數據

 <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
      <version>1.10.1</version>
 </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>

2、處理主類

package sink

import java.util.Properties

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer010, FlinkKafkaProducer011}
/**
 * @author yangwj
 * @date 2021/1/6 21:32
 * @version 1.0
 */
object KafkaSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 從kafka讀取數據
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    val stream = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties) ) // 先轉換成樣例類類型(簡單轉換操作)
    val dataStream = stream
      .map( data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
      } )

    dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema()) )

    env.execute("kafka sink test")


  }
}

##########保存至Mysql##############

1、依賴

 <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.44</version>
 </dependency>

2、處理主類

package sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.yangwj.api.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
/**
 * @author yangwj
 * @date 2021/1/6 22:27
 * @version 1.0
 * 由於官網沒有提供,無法保證數據一致性
 */
object MysqlSink {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("mysql Sink Test")
  }
}
class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定義連接、預編譯語句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading): Unit = {
    // 先執行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新沒有查到數據,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

##########保存至Redis##############

1、依賴

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.0</version>
    </dependency>

2、處理主類

package sink

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
 * @author yangwj
 * @date 2021/1/6 21:48
 * @version 1.0
 */
object RedisSink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //1、redis配置
    val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost")
      .setPort(6379)
      .build()

    dataStream.addSink(new RedisSink[SensorReading](config , new MyRedisMapper()))
  }
}

// 2、定義一個RedisMapper
class MyRedisMapper extends RedisMapper[SensorReading]{
  // 定義保存數據寫入redis的命令,HSET 表名 key value
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
  }
  // 將溫度值指定為value
  override def getValueFromData(data: SensorReading): String = data.temperature.toString

  // 將id指定為key
  override def getKeyFromData(data: SensorReading): String = data.id
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM