Flink學習(十) Sink到Redis


添加依賴

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

編寫代碼

package com.wyh.streamingApi.sink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Sink2Redis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //Source操作
    val inputStream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt") //Transform操作
    val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) val conf = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .build() //Sink操作
    dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper())) env.execute("redis sink test") } } class MyRedisMapper() extends RedisMapper[SensorReading]{ //定義保存數據到Redis的命令
  override def getCommandDescription: RedisCommandDescription = { //把傳感器id和溫度值保存成 Hash表 HSET key field value
    new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature") } //定義保存到redis的key
  override def getKeyFromData(t: SensorReading): String = { t.id } //定義保存到redis的value
  override def getValueFromData(t: SensorReading): String = { t.temperature.toString } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM