1.代碼
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
//溫度傳感器讀取樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//source
val inputStream = env.readTextFile("sensor1.txt")
//transform
import org.apache.flink.api.scala._
val dataStream = inputStream.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
//sink
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper))
env.execute("redis sink test")
}
}
//定義一個redis的mapper類,用於定義保存到redis時調用的命令
class MyRedisMapper extends RedisMapper[SensorReading] {
override def getCommandDescription: RedisCommandDescription = {
//把傳感器id和溫度值保存成哈希表: HSET key field value
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
}
//相當於是field
override def getKeyFromData(data: SensorReading): String = {
data.id
}
override def getValueFromData(data: SensorReading): String = {
data.temperature.toString
}
}
2.結果
有幫助的歡迎評論打賞哈,謝謝!