package org.onepiece.bigdata.windows.JDBC import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.io.jdbc.{JDBCAppendTableSink, JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Types} import org.apache.flink.types.Row import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.Table object mysql_test { import org.apache.flink.api.scala.extensions._ import org.apache.flink.api.scala._ protected val host_name = "localhost" protected val port = 3306 protected val db_name = "vn09jj5" protected val url = s"jdbc:mysql://${host_name}:${port}/${db_name}?useSSL=false&serverTimezone=UTC" val url_flink = "jdbc:mysql://localhost:3306/flink_db?useSSL=false&serverTimezone=UTC" protected val driver = "com.mysql.cj.jdbc.Driver" protected val user = "root" protected val password = "sa123ADMIN" def getBatchExecutionEnvironment(): ExecutionEnvironment = { val benv = ExecutionEnvironment.getExecutionEnvironment return benv } def getStreamExecutionEnvironment(): StreamExecutionEnvironment = { val senv = StreamExecutionEnvironment.getExecutionEnvironment return senv } def getEnvironmentSettings(): EnvironmentSettings = { val setting = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() return setting } def mysql_read_1(): Unit = { val benv = ExecutionEnvironment.getExecutionEnvironment //读取MySQL val dataSource = benv.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(url) .setDrivername(driver) .setUsername(user) .setPassword(password) .setQuery("select phone_Nbr,channel from cte_phone_channel order by phone_Nbr") .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, //phone_Nbr BasicTypeInfo.STRING_TYPE_INFO //channel )) .finish() ) dataSource.print() println(dataSource.count()) benv.execute("mysql-test") } def mysql_read_2(): Unit = { val benv = ExecutionEnvironment.getExecutionEnvironment val query_table = """ |select a.phone_Nbr,a.channel,b.isDelete,b.remart |from cte_phone_channel as a |inner join order_phone as b on b.phone_Nbr=a.phone_Nbr and b.isDelete=1 """.stripMargin //读取MySQL val dataSource = benv.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(url) .setDrivername(driver) .setUsername(user) .setPassword(password) .setQuery(query_table) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, //phone_Nbr BasicTypeInfo.STRING_TYPE_INFO, //channel BasicTypeInfo.INT_TYPE_INFO, //isDelete BasicTypeInfo.STRING_TYPE_INFO //remart )) .finish() ) dataSource.print() println(dataSource.count()) benv.execute("mysql-test") } def mysql_read_3(): Unit = { val benv = ExecutionEnvironment.getExecutionEnvironment val query = "select phone_Nbr,channel from cte_phone_channel order by phone_Nbr" val fields = Array[String]("phone_Nbr", "channel") val types = Array[TypeInformation[_]](Types.STRING, Types.STRING) val typeInfo = new RowTypeInfo(types, fields) //读取MySQL val dataSource = benv.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(url) .setDrivername(driver) .setUsername(user) .setPassword(password) .setQuery(query) .setRowTypeInfo(typeInfo) .finish() ) dataSource.print() println(dataSource.count()) benv.execute("mysql-test") } }