Flink 中定時加載外部數據


社區中有好幾個同學問過這樣的場景:  

flink 任務中,source 進來的數據,需要連接數據庫里面的字段,再做后面的處理

這里假設一個 ETL 的場景,輸入數據包含兩個字段 “type, userid....” ,需要根據 type,連接一張 mysql 的配置表,關聯 type 對應的具體內容。相對於輸入數據的數量,type 的值是很少的(這里默認只有10種), 所以對應配置表就只有10條數據,配置是會定時修改的(比如跑批補充數據),配置的修改必須在一定時間內生效。

實時 ETL,需要用里面的一個字段去關聯數據庫,補充其他數據,進來的數據中關聯字段是很單一的(就10個),對應數據庫的數據也很少,如果用 異步 IO,感覺會比較傻(浪費資源、性能還不好)。同時數據庫的數據是會不定時修改的,所以不能在啟動的時候一次性加載。

Flink 現在對應這種場景可以使用  Boradcase state 做,如:基於Broadcast 狀態的Flink Etl Demo

這里想說的是另一種更簡單的方法: 使用定時器,定時加載數據庫的數據  (就是簡單的Java定時器)

先說一下代碼流程:

1、自定義的 source,輸入逗號分隔的兩個字段

2、使用 RichMapFunction  轉換數據,在 open 中定義定時器,定時觸發查詢 mysql 的任務,並將結果放到一個 map 中

3、輸入數據關聯 map 的數據,然后輸出

先看下數據庫中的數據:

mysql> select * from timer;
+------+------+
| id   | name |
+------+------+
| 0    | 0zOq |
| 1    | 1hKC |
| 2    | 2ibM |
| 3    | 3fCe |
| 4    | 4TaM |
| 5    | 5URU |
| 6    | 6WhP |
| 7    | 7zjn |
| 8    | 8Szl |
| 9    | 9blS |
+------+------+
10 rows in set (0.01 sec)

總共10條數據,id 就是對應的關聯字段,需要填充的數據是 name

下面是主要的代碼:// 自定義的source,輸出 x,xxx 格式隨機字符

    val input = env.addSource(new TwoStringSource)
    val stream = input.map(new RichMapFunction[String, String] {

      val jdbcUrl = "jdbc:mysql://venn:3306?useSSL=false&allowPublicKeyRetrieval=true"
      val username = "root"
      val password = "123456"
      val driverName = "com.mysql.jdbc.Driver"
      var conn: Connection = null
      var ps: PreparedStatement = null
      val map = new util.HashMap[String, String]()

      override def open(parameters: Configuration): Unit = {
        logger.info("init....")
        query()
        // new Timer
        val timer = new Timer(true)
        // schedule is 10 second 定義了一個10秒的定時器,定時執行查詢數據庫的方法
timer.schedule(new TimerTask { override def run(): Unit = { query() } }, 10000, 10000) } override def map(value: String): String = { // concat input and mysql data,簡單關聯輸出 value + "-" + map.get(value.split(",")(0)) } /** * query mysql for get new config data */ def query() = { logger.info("query mysql") try { Class.forName(driverName) conn = DriverManager.getConnection(jdbcUrl, username, password) ps = conn.prepareStatement("select id,name from venn.timer") val rs = ps.executeQuery while (!rs.isClosed && rs.next) { val id = rs.getString(1) val name = rs.getString(2)
// 將結果放到 map 中 map.put(id, name) } logger.info(
"get config from db size : {}", map.size()) } catch { case e@(_: ClassNotFoundException | _: SQLException) => e.printStackTrace() } finally { ps.close() conn.close() } } }) // .print() val sink = new FlinkKafkaProducer[String]("timer_out" , new MyKafkaSerializationSchema[String]() , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE) stream.addSink(sink)

簡單的Java定時器:

val timer = new Timer(true)
// schedule is 10 second, 5 second between successive task executions
timer.schedule(new TimerTask {
  override def run(): Unit = {
    query()
  }
}, 10000, 10000)

------------------20200327 改---------------------

之前 博客寫的有問題,public void schedule(TimerTask task, long delay, long period) 的第三個參數才是重復執行的時間間隔,0 是不執行,我之前寫的時候放上去的案例,調用的 Timer 的構造方法是: public void schedule(TimerTask task, long delay) 只會在 delay 時間后調用一次,並不會重復執行,不需要 調用 :  public void schedule(TimerTask task, long delay, long period)   這樣的構造方法,才能真正的定時執行。

使用之前的方法執行的,會看到query 方法執行了兩次,是 open 中主動調用了一次和 之后調度了一次,定時器就結束了。

感謝社區大佬指出

同時社區還有大佬指出 : ScheduledExecutorService 會比 timer 更好;理由: Timer里邊的邏輯失敗的話不會拋出任何異常,直接結束,建議用ScheduledExecutorService替換Timer並且捕獲下異常看看

------------------------------------

看下輸出的數據:

7,N-7zjn
7,C-7zjn
7,U-7zjn
4,T-4TaM
7,J-7zjn
9,R-9blS
4,C-4TaM
9,T-9blS
4,A-4TaM
6,I-6WhP
9,U-9blS

注:“-” 之前是原始數據,后面是關聯后的數據

部署到服務器上定時器的調度:

2019-09-28 18:28:13,476 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
2019-09-28 18:28:13,480 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10
2019-09-28 18:28:18,553 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer 0/1 - checkpoint 17 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666488499} from checkpoint 17
2019-09-28 18:28:23,476 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
2019-09-28 18:28:23,481 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10
2019-09-28 18:28:28,549 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer 0/1 - checkpoint 18 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666498505} from checkpoint 18
2019-09-28 18:28:33,477 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
2019-09-28 18:28:33,484 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10

十秒調度一次

 歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM