Flink輸出到JDBC


1.代碼

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

//溫度傳感器讀取樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//source
val inputStream = env.readTextFile("sensor1.txt")
//transform
import org.apache.flink.api.scala._
val dataStream = inputStream.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
//sink
dataStream.addSink(new MyJdbcSink())
env.execute("jdbc sink test")
}
}

class MyJdbcSink() extends RichSinkFunction[SensorReading] {
//定義sql連接、預編譯器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

//初始化 創建連接和預編譯語句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "用戶名", "密碼") //test庫
insertStmt = conn.prepareStatement("insert into temperatures(sensor, temp) values (?, ?)")
updateStmt = conn.prepareStatement("update temperatures set temp = ? where sensor = ?")
}

//調用連接,執行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
//執行更新語句
updateStmt.setDouble(1, value.temperature) //1和2對應updateStmt里面sql的問號的位置
updateStmt.setString(2, value.id)
updateStmt.execute()
//如果update沒有查到數據,就執行插入操作
if(updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}

//關閉
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}

}

2.結果

有幫助的歡迎評論打賞哈,謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM