Flink MysqlSink 簡單樣例


在大數據領域中,有很多nosql 的數據庫,典型的 hbase,可以實現大數據量下的快速查詢,但是關系型數據的地位還是沒辦法替代。比如上個項目中,計算完的結果數據,還是會輸出到關系型數據庫當中。Flink 中沒有提供關系型數據的connector,看到有小伙伴在問,怎么實現,就寫個簡單的demo。

Flink sink,都有兩種方式,對外輸出數據:

繼承RichSinkFunction
實現OutputFormat接口

這里繼承RichSinkFunction 實現 往 mysql 輸出數據的sink。

mysql 表結構如下:

mysql> desc user;
+----------+-------------+------+-----+---------+----------------+
| Field    | Type        | Null | Key | Default | Extra          |
+----------+-------------+------+-----+---------+----------------+
| id       | int(11)     | NO   | PRI | NULL    | auto_increment |
| username | varchar(32) | NO   | UNI | NULL    |                |
| password | varchar(32) | NO   |     | NULL    |                |
| sex      | int(11)     | YES  |     | 0       |                |
| phone    | varchar(18) | YES  |     | NULL    |                |
+----------+-------------+------+-----+---------+----------------+
5 rows in set (0.00 sec)

執行流程如下:

  kafka source -> map -> mysqlSink

1、繼承RichSinkFunction

  主要代碼如下:

env.addSource(source)
        .map(li => {
          val tmp = li.split(",")
          new User(tmp(0), tmp(1), tmp(2)toInt, tmp(3))
        })
        .addSink(new MysqlSink)

MysqlSink:

import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.slf4j.{Logger, LoggerFactory}

class MysqlSink extends RichSinkFunction[User] {

  val logger: Logger = LoggerFactory.getLogger("MysqlSink")
  var conn: Connection = _
  var ps: PreparedStatement = _
  val jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true"
  val username = "root"
  val password = "123456"
  val driverName = "com.mysql.jdbc.Driver"

  override def open(parameters: Configuration): Unit = {

    Class.forName(driverName)
    try {
      Class.forName(driverName)
      conn = DriverManager.getConnection(jdbcUrl, username, password)

      // close auto commit
      conn.setAutoCommit(false)
    } catch {
      case e@(_: ClassNotFoundException | _: SQLException) =>
        logger.error("init mysql error")
        e.printStackTrace()
        System.exit(-1);
    }
  }

  /**
    * 吞吐量不夠話,可以將數據暫存在狀態中,批量提交的方式提高吞吐量(如果oom,可能就是數據量太大,資源沒有及時釋放導致的)
    * @param user
    * @param context
    */
  override def invoke(user: User, context: SinkFunction.Context[_]): Unit = {
    println("get user : " + user.toString)
    ps = conn.prepareStatement("insert into async.user(username, password, sex, phone) values(?,?,?,?)")
    ps.setString(1, user.username)
    ps.setString(2, user.password)
    ps.setInt(3, user.sex)
    ps.setString(4, user.phone)

    ps.execute()
    conn.commit()
  }
  override def close(): Unit = {
    if (conn != null){
      conn.commit()
      conn.close()
    }
  }
}

2、實現 OutputFormat 接口

  主要代碼如下:

env.addSource(source)
        .map(li => {
          val tmp = li.split(",")
          new User(tmp(0), tmp(1), tmp(2)toInt, tmp(3))
        })
//        .addSink(new MysqlSink1)
      .writeUsingOutputFormat(new MysqlSink1)

MysqlSink1

import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.Configuration
import org.slf4j.{Logger, LoggerFactory}

class MysqlSink1 extends OutputFormat[User]{

  val logger: Logger = LoggerFactory.getLogger("MysqlSink1")
  var conn: Connection = _
  var ps: PreparedStatement = _
  val jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true"
  val username = "root"
  val password = "123456"
  val driverName = "com.mysql.jdbc.Driver"

  override def configure(parameters: Configuration): Unit = {
    // not need
  }

  override def open(taskNumber: Int, numTasks: Int): Unit = {
    Class.forName(driverName)
    try {
      Class.forName(driverName)
      conn = DriverManager.getConnection(jdbcUrl, username, password)

      // close auto commit
      conn.setAutoCommit(false)
    } catch {
      case e@(_: ClassNotFoundException | _: SQLException) =>
        logger.error("init mysql error")
        e.printStackTrace()
        System.exit(-1);
    }
  }

  override def writeRecord(user: User): Unit = {

    println("get user : " + user.toString)
    ps = conn.prepareStatement("insert into async.user(username, password, sex, phone) values(?,?,?,?)")
    ps.setString(1, user.username)
    ps.setString(2, user.password)
    ps.setInt(3, user.sex)
    ps.setString(4, user.phone)

    ps.execute()
    conn.commit()
  }

  override def close(): Unit = {

    if (conn != null){
      conn.commit()
      conn.close()
    }
  }
}

比較簡單,就不貼測試結果了,如果吞吐量大,一定要改成批量提交的。

搞定


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM