添加依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
编写代码
package com.wyh.streamingApi.sink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Sink2Redis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //Source操作
val inputStream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt") //Transform操作
val dataStream: DataStream[SensorReading] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) val conf = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .build() //Sink操作
dataStream.addSink(new RedisSink[SensorReading](conf,new MyRedisMapper())) env.execute("redis sink test") } } class MyRedisMapper() extends RedisMapper[SensorReading]{ //定义保存数据到Redis的命令
override def getCommandDescription: RedisCommandDescription = { //把传感器id和温度值保存成 Hash表 HSET key field value
new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature") } //定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = { t.id } //定义保存到redis的value
override def getValueFromData(t: SensorReading): String = { t.temperature.toString } }