FLINK-connectors-Mysql读写


 一、写入mysql

1.pom.xml

<!--写入mysql-->
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_2.11</artifactId>
        <version>1.11.0</version>
</dependency>

2.在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row

object MySQLSourceSinkApp2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
    val username = "root"
    val password = "123456"
    val sql_read = "select * from sensor limit 5"
    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
      // 获取数据流
      val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(driver)
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.LONG_TYPE_INFO,
          BasicTypeInfo.DOUBLE_TYPE_INFO))
        .finish())

      // 转化为自定义格式
      val dStream = dataResult.map(x=> {
        val id = x.getField(0).asInstanceOf[String]
        val timestamp = x.getField(1).asInstanceOf[Long]
        val timepreture = x.getField(2).asInstanceOf[Double]
        SensorReading(id, timestamp, timepreture)
      })
      return dStream
    }

    // 读取mysql数据
    val readStream = readMysql(env, url, driver ,username ,password ,sql_read)
    // 将流中的数据格式转化为JDBCOutputFormat接受的格式
    val outputData = readStream.map(x => {
      val row = new Row(3)
      row.setField(0, x.id)
      row.setField(1, x.timestamp)
      row.setField(2, x.timepreture)
      row
    })


    def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
      outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .finish())
      env.execute("insert data to mysql")
      print("data write successfully")
    }

    // 向mysql插入数据
    writeMysql(env,outputData,url,username,password,sql_write)

  }

}

3.flink1.12官网

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(...)
    .addSink(JdbcSink.sink(
    "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
    (ps, t) -> {
        ps.setInt(1, t.id);
        ps.setString(2, t.title);
        ps.setString(3, t.author);
        ps.setDouble(4, t.price);
        ps.setInt(5, t.qty);
    },
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl(getDbMetadata().getUrl())
        .withDriverName(getDbMetadata().getDriverClass())
        .build()));
env.execute()

4.新建一个scala object 类,JdbcSink

 
 
DROP TABLE IF EXISTS `sensor_temp`;
CREATE TABLE `sensor_temp` ( `ids` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `temp` double(10,0) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; SET FOREIGN_KEY_CHECKS = 1;
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

object JdbcSink {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)
    inputStream.print()

    //先转换成样例类类型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割数据,获取结果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
      })

    dataStream.addSink(new MyJdbcSinkFunc())
    env.execute()
  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReadingTest5]{

  //定义连接、预编译语句
  var conn: Connection = _

  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://10.0.83.82:3306/test","root","Mafei@20201104")
    insertStmt = conn.prepareStatement("INSERT INTO `sensor_temp`(`ids`, `temp`) VALUES ( ?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp= ? where ids= ? ")
  }

  override def invoke(in: SensorReadingTest5): Unit = {

    updateStmt.setDouble(1,in.temperature)
    updateStmt.setString(2,in.id)
    updateStmt.execute()
    if (updateStmt.getUpdateCount ==0){
      println("执行了插入操作。。。")
      insertStmt.setString(1,in.id)
      insertStmt.setDouble(2,in.temperature)
      insertStmt.execute()
    }

  }

  override def close(): Unit = {

    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

二、读取mysql

1.pom.xml

<!--读取mysql-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

2.创建连接器

val table1="""CREATE TABLE table1 (
        user_id STRING,
        user_name STRING
) WITH (
'connector' = 'jdbc',
'driver'='com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://10.0.83.82:3306/test?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'table_name',
'username' = 'aaa',
'password' = 'bbb',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '600000')
""".stripMargin

3.读取表

//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建表执行环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
env.executeSql("""select * from table1 where user_id='' """.stripMargin)

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM