Flink 读取 MySQL 示例


package org.onepiece.bigdata.windows.JDBC

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.io.jdbc.{JDBCAppendTableSink, JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Types}
import org.apache.flink.types.Row
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment

import org.apache.flink.table.api.Table

object mysql_test {

  import org.apache.flink.api.scala.extensions._
  import org.apache.flink.api.scala._

  protected val host_name = "localhost"
  protected val port = 3306
  protected val db_name = "vn09jj5"
  protected val url = s"jdbc:mysql://${host_name}:${port}/${db_name}?useSSL=false&serverTimezone=UTC"

  val url_flink = "jdbc:mysql://localhost:3306/flink_db?useSSL=false&serverTimezone=UTC"

  protected val driver = "com.mysql.cj.jdbc.Driver"
  protected val user = "root"
  protected val password = "sa123ADMIN"


  def getBatchExecutionEnvironment(): ExecutionEnvironment = {
    val benv = ExecutionEnvironment.getExecutionEnvironment
    return benv
  }

  def getStreamExecutionEnvironment(): StreamExecutionEnvironment = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    return senv
  }

  def getEnvironmentSettings(): EnvironmentSettings = {
    val setting = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    return setting
  }


  def mysql_read_1(): Unit = {
    val benv = ExecutionEnvironment.getExecutionEnvironment

    //读取MySQL
    val dataSource = benv.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
        .setDBUrl(url)
        .setDrivername(driver)
        .setUsername(user)
        .setPassword(password)
        .setQuery("select phone_Nbr,channel from cte_phone_channel order by phone_Nbr")
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO, //phone_Nbr
          BasicTypeInfo.STRING_TYPE_INFO  //channel
        ))
        .finish()
    )

    dataSource.print()
    println(dataSource.count())

    benv.execute("mysql-test")
  }

  def mysql_read_2(): Unit = {
    val benv = ExecutionEnvironment.getExecutionEnvironment

    val query_table =
      """
        |select a.phone_Nbr,a.channel,b.isDelete,b.remart
        |from cte_phone_channel as a
        |inner join order_phone as b on b.phone_Nbr=a.phone_Nbr and b.isDelete=1
      """.stripMargin

    //读取MySQL
    val dataSource = benv.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
        .setDBUrl(url)
        .setDrivername(driver)
        .setUsername(user)
        .setPassword(password)
        .setQuery(query_table)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO, //phone_Nbr
          BasicTypeInfo.STRING_TYPE_INFO, //channel
          BasicTypeInfo.INT_TYPE_INFO,    //isDelete
          BasicTypeInfo.STRING_TYPE_INFO  //remart
        ))
        .finish()
    )

    dataSource.print()
    println(dataSource.count())

    benv.execute("mysql-test")
  }

  def mysql_read_3(): Unit = {
    val benv = ExecutionEnvironment.getExecutionEnvironment

    val query = "select phone_Nbr,channel from cte_phone_channel order by phone_Nbr"

    val fields = Array[String]("phone_Nbr", "channel")
    val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    val typeInfo = new RowTypeInfo(types, fields)

    //读取MySQL
    val dataSource = benv.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
        .setDBUrl(url)
        .setDrivername(driver)
        .setUsername(user)
        .setPassword(password)
        .setQuery(query)
        .setRowTypeInfo(typeInfo)
        .finish()
    )

    dataSource.print()
    println(dataSource.count())

    benv.execute("mysql-test")
  }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM