FLink自定義Sink,生產的數據導出到mysql


一、自定義生產數據

https://www.cnblogs.com/robots2/p/16048729.html

二、生產轉化數據,導出到mysql

2.1 建表語句

CREATE TABLE `video_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `money` int(11) DEFAULT NULL,
  `title` varchar(32) DEFAULT NULL,
  `trade_no` varchar(64) DEFAULT NULL,
  `create_time` date DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=32 DEFAULT CHARSET=utf8mb4

2.2自定義source,配置mysql數據源

package net.xdclass.class6.sink;

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import net.xdclass.model.VideoOrder;

/**
 * @desc
 * @menu
 */
public class MysqlSink extends RichSinkFunction<VideoOrder> {
    private Connection conn;
    private PreparedStatement ps;

    //打開
    @Override
    public void open(Configuration parameters) throws Exception {
        //建立mysql連接
        conn = DriverManager.getConnection("jdbc:mysql://ip地址:3306/flink_test?useUnicode=true" +
                                                   "&characterEncoding=utf8&allowMultiQueries=true&serverTimezone" +
                                                   "=Asia/Shanghai", "賬號", "密碼");
        String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
        ps = conn.prepareStatement(sql);
        System.out.println("自定義sink,open數據庫鏈接 =====");
    }

    //關閉
    @Override
    public void close() throws Exception {
        //關閉mysql連接
        System.out.println("自定義sink,close數據庫連接 =====");
        if(conn != null){
            conn.close();
        }
        if(ps != null){
            ps.close();;
        }
    }

    //每個數據流對象過來,調用的方法
    @Override
    public void invoke(VideoOrder value, Context context) throws Exception {
        ps.setInt(1,value.getUserId());
        ps.setInt(2,value.getMoney());
        ps.setString(3,value.getTitle());
        ps.setString(4,value.getTradeNo());
        ps.setDate(5,new Date(value.getCreateTime().getTime()));

        ps.executeUpdate();
    }
}

2.3 導出到mysql

package net.xdclass.class6.app;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import net.xdclass.class6.sink.MysqlSink;
import net.xdclass.class6.source.VideoOrderSource;
import net.xdclass.model.VideoOrder;

/**
 * @menu
 */
public class FLink04CustomSinkApp {

    public static void main(String[] args) throws Exception {
        //WebUi方式運行
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置運行模式為流批一體
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //並行度
        env.setParallelism(1);
        //設置為自定義source
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());

        //過濾
        DataStream<VideoOrder> dfFliter = ds.filter(new FilterFunction<VideoOrder>() {
            @Override
            public boolean filter(VideoOrder videoOrder) throws Exception {
                return videoOrder.getMoney() > 10;
            }
        });

        dfFliter.print();
        //設置為自定義sink
        dfFliter.addSink(new MysqlSink());

        //設置名字
        env.execute("CustomSourceApp");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM