一、写入mysql
1.pom.xml
<!--写入mysql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.11.0</version> </dependency>
2.在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。
JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。
package com.atguigu.flink.app import com.atguigu.flink.bean.SensorReading import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import org.apache.flink.types.Row object MySQLSourceSinkApp2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false" val username = "root" val password = "123456" val sql_read = "select * from sensor limit 5" val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)" def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 获取数据流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 转化为自定义格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream } // 读取mysql数据 val readStream = readMysql(env, url, driver ,username ,password ,sql_read) // 将流中的数据格式转化为JDBCOutputFormat接受的格式 val outputData = readStream.map(x => { val row = new Row(3) row.setField(0, x.id) row.setField(1, x.timestamp) row.setField(2, x.timepreture) row }) def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") } // 向mysql插入数据 writeMysql(env,outputData,url,username,password,sql_write) } }
3.flink1.12官网
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements(...) .addSink(JdbcSink.sink( "insert into books (id, title, author, price, qty) values (?,?,?,?,?)", (ps, t) -> { ps.setInt(1, t.id); ps.setString(2, t.title); ps.setString(3, t.author); ps.setDouble(4, t.price); ps.setInt(5, t.qty); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(getDbMetadata().getUrl()) .withDriverName(getDbMetadata().getDriverClass()) .build())); env.execute()
4.新建一个scala object 类,JdbcSink
DROP TABLE IF EXISTS `sensor_temp`;
CREATE TABLE `sensor_temp` ( `ids` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `temp` double(10,0) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; SET FOREIGN_KEY_CHECKS = 1;
import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} object JdbcSink { def main(args: Array[String]): Unit = { //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) inputStream.print() //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别 }) dataStream.addSink(new MyJdbcSinkFunc()) env.execute() } } class MyJdbcSinkFunc() extends RichSinkFunction[SensorReadingTest5]{ //定义连接、预编译语句 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://10.0.83.82:3306/test","root","Mafei@20201104") insertStmt = conn.prepareStatement("INSERT INTO `sensor_temp`(`ids`, `temp`) VALUES ( ?, ?)") updateStmt = conn.prepareStatement("update sensor_temp set temp= ? where ids= ? ") } override def invoke(in: SensorReadingTest5): Unit = { updateStmt.setDouble(1,in.temperature) updateStmt.setString(2,in.id) updateStmt.execute() if (updateStmt.getUpdateCount ==0){ println("执行了插入操作。。。") insertStmt.setString(1,in.id) insertStmt.setDouble(2,in.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
二、读取mysql
1.pom.xml
<!--读取mysql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.11.0</version> </dependency>
2.创建连接器
val table1="""CREATE TABLE table1 ( user_id STRING, user_name STRING ) WITH ( 'connector' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://10.0.83.82:3306/test?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'table_name', 'username' = 'aaa', 'password' = 'bbb', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '600000') """.stripMargin
3.读取表
//创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //创建表执行环境 val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) env.executeSql("""select * from table1 where user_id='' """.stripMargin)