項目實戰 從 0 到 1 學習之Flink(22)讀mysql並寫入mysql


在Flink文檔中,提供connector讀取源數據和把處理結果存儲到外部系統中。但是沒有提供數據庫的connector,如果要讀寫數據庫,官網給出了異步IO(Asynchronous I/O)專門用於訪問外部數據,詳細可看:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html

還有一種方法是繼承RichSourceFunction,重寫里面的方法,具體如下:

讀取mysql的類:

package com.my.flink.utils.streaming.mysql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.my.flink.utils.config.ConfigKeys;
 
/**
 * @Description mysql source
 * @Author jiangxiaozhi
 * @Date 2018/10/15 17:05
 **/
public class JdbcReader extends RichSourceFunction<Tuple2<String,String>> {
    private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class);
 
    private Connection connection = null;
    private PreparedStatement ps = null;
 
    //該方法主要用於打開數據庫連接,下面的ConfigKeys類是獲取配置的類
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName(ConfigKeys.DRIVER_CLASS());//加載數據庫驅動
        connection = DriverManager.getConnection(ConfigKeys.SOURCE_DRIVER_URL(), ConfigKeys.SOURCE_USER(), ConfigKeys.SOURCE_PASSWORD());//獲取連接
        ps = connection.prepareStatement(ConfigKeys.SOURCE_SQL());
    }
 
    //執行查詢並獲取結果
    @Override
    public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
        try {
            ResultSet resultSet = ps.executeQuery();
            while (resultSet.next()) {
                String name = resultSet.getString("nick");
                String id = resultSet.getString("user_id");
                logger.error("readJDBC name:{}", name);
                Tuple2<String,String> tuple2 = new Tuple2<>();
                tuple2.setFields(id,name);
                ctx.collect(tuple2);//發送結果,結果是tuple2類型,2表示兩個元素,可根據實際情況選擇
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
 
    }
     
    //關閉數據庫連接
    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
    }
}

 寫入mysql的類:

package com.my.flink.utils.streaming.mysql;
 
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Tuple2;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import com.my.flink.utils.config.ConfigKeys;
 
/**
 * @Description mysql sink
 * @Author jiangxiaozhi
 * @Date 2018/10/15 18:31
 **/
public class JdbcWriter extends RichSinkFunction<Tuple2<String,String>> {
    private Connection connection;
    private PreparedStatement preparedStatement;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 加載JDBC驅動
        Class.forName(ConfigKeys.DRIVER_CLASS());
        // 獲取數據庫連接
        connection = DriverManager.getConnection(ConfigKeys.SINK_DRIVER_URL(),ConfigKeys.SINK_USER(),ConfigKeys.SINK_PASSWORD());//寫入mysql數據庫
        preparedStatement = connection.prepareStatement(ConfigKeys.SINK_SQL());//insert sql在配置文件中
        super.open(parameters);
    }
 
    @Override
    public void close() throws Exception {
        super.close();
        if(preparedStatement != null){
            preparedStatement.close();
        }
        if(connection != null){
            connection.close();
        }
        super.close();
    }
 
    @Override
    public void invoke(Tuple1<String,String> value, Context context) throws Exception {
        try {
            String name = value._1;//獲取JdbcReader發送過來的結果
            preparedStatement.setString(1,name);
            preparedStatement.executeUpdate();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

程序入口核心代碼:

  //scala代碼
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(5000)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  val dataStream =  env.addSource(new JdbcReader())//,讀取mysql數據,獲取dataStream后可以做邏輯處理,這里沒有
做
  dataStream.addSink(new JdbcWriter())//寫入mysql
  env.execute("flink mysql demo")//運行程序

運行程序就可以在數據庫表中看到寫入的數據了。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM