Flink学习(十) Sink到Redis


添加依赖

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

编写代码

package com.wyh.streamingApi.sink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Sink2Redis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //Source操作
    val inputStream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt") //Transform操作
    val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) val conf = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .build() //Sink操作
    dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper())) env.execute("redis sink test") } } class MyRedisMapper() extends RedisMapper[SensorReading]{ //定义保存数据到Redis的命令
  override def getCommandDescription: RedisCommandDescription = { //把传感器id和温度值保存成 Hash表 HSET key field value
    new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature") } //定义保存到redis的key
  override def getKeyFromData(t: SensorReading): String = { t.id } //定义保存到redis的value
  override def getValueFromData(t: SensorReading): String = { t.temperature.toString } }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM