FLINK-connectors-Mysql讀寫


 一、寫入mysql

1.pom.xml

<!--寫入mysql-->
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_2.11</artifactId>
        <version>1.11.0</version>
</dependency>

2.在flink中沒有現成的用來寫入MySQL的sink,但是flink提供了一個類,JDBCOutputFormat,通過這個類,如果你提供了jdbc的driver,則可以當做sink使用。

JDBCOutputFormat其實是flink的batch api,但也可以用來作為stream的api使用,社區也推薦通過這種方式來進行。

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row

object MySQLSourceSinkApp2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
    val username = "root"
    val password = "123456"
    val sql_read = "select * from sensor limit 5"
    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
      // 獲取數據流
      val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(driver)
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.LONG_TYPE_INFO,
          BasicTypeInfo.DOUBLE_TYPE_INFO))
        .finish())

      // 轉化為自定義格式
      val dStream = dataResult.map(x=> {
        val id = x.getField(0).asInstanceOf[String]
        val timestamp = x.getField(1).asInstanceOf[Long]
        val timepreture = x.getField(2).asInstanceOf[Double]
        SensorReading(id, timestamp, timepreture)
      })
      return dStream
    }

    // 讀取mysql數據
    val readStream = readMysql(env, url, driver ,username ,password ,sql_read)
    // 將流中的數據格式轉化為JDBCOutputFormat接受的格式
    val outputData = readStream.map(x => {
      val row = new Row(3)
      row.setField(0, x.id)
      row.setField(1, x.timestamp)
      row.setField(2, x.timepreture)
      row
    })


    def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
      outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .finish())
      env.execute("insert data to mysql")
      print("data write successfully")
    }

    // 向mysql插入數據
    writeMysql(env,outputData,url,username,password,sql_write)

  }

}

3.flink1.12官網

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(...)
    .addSink(JdbcSink.sink(
    "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
    (ps, t) -> {
        ps.setInt(1, t.id);
        ps.setString(2, t.title);
        ps.setString(3, t.author);
        ps.setDouble(4, t.price);
        ps.setInt(5, t.qty);
    },
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl(getDbMetadata().getUrl())
        .withDriverName(getDbMetadata().getDriverClass())
        .build()));
env.execute()

4.新建一個scala object 類,JdbcSink

 
         
DROP TABLE IF EXISTS `sensor_temp`;
CREATE TABLE `sensor_temp` ( `ids` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL, `temp` double(10,0) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; SET FOREIGN_KEY_CHECKS = 1;
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

object JdbcSink {
  def main(args: Array[String]): Unit = {
    //創建執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)
    inputStream.print()

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是因為默認分割后是字符串類別
      })

    dataStream.addSink(new MyJdbcSinkFunc())
    env.execute()
  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReadingTest5]{

  //定義連接、預編譯語句
  var conn: Connection = _

  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://10.0.83.82:3306/test","root","Mafei@20201104")
    insertStmt = conn.prepareStatement("INSERT INTO `sensor_temp`(`ids`, `temp`) VALUES ( ?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp= ? where ids= ? ")
  }

  override def invoke(in: SensorReadingTest5): Unit = {

    updateStmt.setDouble(1,in.temperature)
    updateStmt.setString(2,in.id)
    updateStmt.execute()
    if (updateStmt.getUpdateCount ==0){
      println("執行了插入操作。。。")
      insertStmt.setString(1,in.id)
      insertStmt.setDouble(2,in.temperature)
      insertStmt.execute()
    }

  }

  override def close(): Unit = {

    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

二、讀取mysql

1.pom.xml

<!--讀取mysql-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.11.0</version>
</dependency>

2.創建連接器

val table1="""CREATE TABLE table1 (
        user_id STRING,
        user_name STRING
) WITH (
'connector' = 'jdbc',
'driver'='com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://10.0.83.82:3306/test?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'table_name',
'username' = 'aaa',
'password' = 'bbb',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '600000')
""".stripMargin

3.讀取表

//創建執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//創建表執行環境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
env.executeSql("""select * from table1 where user_id='' """.stripMargin)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM