一、redis sink
對應jar包
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
將文件內容寫入到hash中
代碼:
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
val arr = d.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
//redis sink
val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build()
dataStream.addSink(new RedisSink(config,new MyRedisMapper))
env.execute("redis sink test")
}
}
class MyRedisMapper extends RedisMapper[SensorReading]{
//命令為hset,鍵為sensor_temperature
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
}
//field為傳感器id
override def getKeyFromData(t: SensorReading): String = t.id
//value為溫度
override def getValueFromData(t: SensorReading): String = t.temperature.toString
}
redis查看結果
127.0.0.1:6379> hgetall sensor_temperature 1) "sensor_1" 2) "35.80018327300259" 3) "sensor_6" 4) "15.402984393403084" 5) "sensor_10" 6) "38.101067604893444" 7) "sensor_7" 8) "6.720945201171228"
二、es sink
對應jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
將文件內容寫入到es中
代碼:
object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
val arr = d.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
//es sink
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("hadoop101",9200))
//創建一個es sink的builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("保存數據:" + t)
//包裝成map
val map = new util.HashMap[String, String]()
map.put("sensor_id", t.id)
map.put("temperature", t.temperature.toString)
map.put("ts", t.timestamp.toString)
//創建index request,准備發送數據
val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map)
//利用requestIndexer發送請求,寫入數據
requestIndexer.add(indexRequest)
println("保存成功")
}
})
esSinkBuilder
dataStream.addSink(esSinkBuilder.build())
env.execute("redis sink test")
}
}
es中查看結果
{
"took" : 148,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 6,
"max_score" : 1.0,
"hits" : [
{
"_index" : "sensor",
"_type" : "redingdata",
"_id" : "QXpZnnEBUwLRQchmepbT",
"_score" : 1.0,
"_source" : {
"sensor_id" : "sensor_6",
"temperature" : "15.402984393403084",
"ts" : "1547718201"
}
},
{
"_index" : "sensor",
"_type" : "redingdata",
"_id" : "RnpZnnEBUwLRQchme5ZS",
"_score" : 1.0,
"_source" : {
"sensor_id" : "sensor_7",
"temperature" : "6.720945201171228",
"ts" : "1547718202"
}
},
{
"_index" : "sensor",
"_type" : "redingdata",
"_id" : "Q3pZnnEBUwLRQchmepbr",
"_score" : 1.0,
"_source" : {
"sensor_id" : "sensor_1",
"temperature" : "35.80018327300259",
"ts" : "1547718199"
}
},
{
"_index" : "sensor",
"_type" : "redingdata",
"_id" : "QnpZnnEBUwLRQchmepbo",
"_score" : 1.0,
"_source" : {
"sensor_id" : "sensor_1",
"temperature" : "30.8",
"ts" : "1547718200"
}
},
{
"_index" : "sensor",
"_type" : "redingdata",
"_id" : "RHpZnnEBUwLRQchmepbs",
"_score" : 1.0,
"_source" : {
"sensor_id" : "sensor_1",
"temperature" : "40.8",
"ts" : "1547718201"
}
},
{
"_index" : "sensor",
"_type" : "redingdata",
"_id" : "RXpZnnEBUwLRQchmepbu",
"_score" : 1.0,
"_source" : {
"sensor_id" : "sensor_10",
"temperature" : "38.101067604893444",
"ts" : "1547718205"
}
}
]
}
}
三、jdbc sink
①mysql驅動
<!-- mysql sink --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
②自定義mysql sink,繼承RichSinkFunction,重寫執行邏輯以及初始化和關閉資源的方法。
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
//定義sql連接、預編譯器
var conn:Connection = _
var insertStmt : PreparedStatement = _
var updateStmt:PreparedStatement=_
//初始化
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql:///test","root","123456")
insertStmt = conn.prepareStatement("insert into temperatures(sensor,temp) values(?,?)")
updateStmt = conn.prepareStatement("update temperatures set temp=? where sensor=?")
}
//調用連接,執行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
//這句必須刪掉
//super.invoke(value, context)
//執行更新語句
updateStmt.setDouble(1,value.temperature)
updateStmt.setString(2,value.id)
updateStmt.execute()
//如果沒有,則插入
if (updateStmt.getUpdateCount == 0){
insertStmt.setString(1,value.id)
insertStmt.setDouble(2,value.temperature)
insertStmt.execute()
}
}
//關閉資源
override def close(): Unit = {
updateStmt.close()
insertStmt.close()
conn.close()
}
}
③添加自定義的mysql sink並執行
object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
val arr = d.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
//jdbc sink
dataStream.addSink(new MyJdbcSink())
env.execute("jdbc sink test")
}
}
