一、寫入mysql
1.pom.xml
<!--寫入mysql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.11.0</version> </dependency>
2.在flink中沒有現成的用來寫入MySQL的sink,但是flink提供了一個類,JDBCOutputFormat,通過這個類,如果你提供了jdbc的driver,則可以當做sink使用。
JDBCOutputFormat其實是flink的batch api,但也可以用來作為stream的api使用,社區也推薦通過這種方式來進行。
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.types.Row object MySQLSourceSinkApp2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false" val username = "root" val password = "123456" val sql_read = "select * from sensor limit 5" val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)" def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 獲取數據流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 轉化為自定義格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream } // 讀取mysql數據 val readStream = readMysql(env, url, driver ,username ,password ,sql_read) // 將流中的數據格式轉化為JDBCOutputFormat接受的格式 val outputData = readStream.map(x => { val row = new Row(3) row.setField(0, x.id) row.setField(1, x.timestamp) row.setField(2, x.timepreture) row }) def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") } // 向mysql插入數據 writeMysql(env,outputData,url,username,password,sql_write) } }
3.flink1.12官網
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements(...) .addSink(JdbcSink.sink( "insert into books (id, title, author, price, qty) values (?,?,?,?,?)", (ps, t) -> { ps.setInt(1, t.id); ps.setString(2, t.title); ps.setString(3, t.author); ps.setDouble(4, t.price); ps.setInt(5, t.qty); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(getDbMetadata().getUrl()) .withDriverName(getDbMetadata().getDriverClass()) .build())); env.execute()
4.新建一個scala object 類,JdbcSink
DROP TABLE IF EXISTS `sensor_temp`;
CREATE TABLE `sensor_temp` ( `ids` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `temp` double(10,0) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; SET FOREIGN_KEY_CHECKS = 1;
import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} object JdbcSink { def main(args: Array[String]): Unit = { //創建執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) inputStream.print() //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是因為默認分割后是字符串類別 }) dataStream.addSink(new MyJdbcSinkFunc()) env.execute() } } class MyJdbcSinkFunc() extends RichSinkFunction[SensorReadingTest5]{ //定義連接、預編譯語句 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://10.0.83.82:3306/test","root","Mafei@20201104") insertStmt = conn.prepareStatement("INSERT INTO `sensor_temp`(`ids`, `temp`) VALUES ( ?, ?)") updateStmt = conn.prepareStatement("update sensor_temp set temp= ? where ids= ? ") } override def invoke(in: SensorReadingTest5): Unit = { updateStmt.setDouble(1,in.temperature) updateStmt.setString(2,in.id) updateStmt.execute() if (updateStmt.getUpdateCount ==0){ println("執行了插入操作。。。") insertStmt.setString(1,in.id) insertStmt.setDouble(2,in.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
二、讀取mysql
1.pom.xml
<!--讀取mysql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.11.0</version> </dependency>
2.創建連接器
val table1="""CREATE TABLE table1 ( user_id STRING, user_name STRING ) WITH ( 'connector' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://10.0.83.82:3306/test?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'table_name', 'username' = 'aaa', 'password' = 'bbb', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '600000') """.stripMargin
3.讀取表
//創建執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //創建表執行環境 val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) env.executeSql("""select * from table1 where user_id='' """.stripMargin)