Flink流處理-Sink之Mysql


ElectricFenceMysqlSink

package pers.aishuang.flink.streaming.sink.mysql;

import com.ibm.icu.text.SimpleDateFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;
import pers.aishuang.flink.streaming.utils.DateUtil;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;


public class ElectricFenceMysqlSink extends RichSinkFunction<ElectricFenceModel> {
    //創建日志打印器
    private static Logger logger = LoggerFactory.getLogger(ElectricFenceMysqlSink.class);
    //定義JDBC變量
    private static Connection conn = null;
    private static PreparedStatement pstmt = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //獲取conf.properties配置文件參數
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        //獲取mysql的driver、url、user、password
        String driver = globalJobParameters.getRequired("jdbc.driver");
        String url = globalJobParameters.getRequired("jdbc.url");
        String user = globalJobParameters.getRequired("jdbc.user");
        String password = globalJobParameters.getRequired("jdbc.password");
        //加載驅動、獲取Mysql連接、創建SQL字符串、獲取sql預編譯
        Class.forName(driver);
        conn = DriverManager.getConnection(url, user, password);
    }

    //本質上是進入數據和出入數據合成一條數據,
    // gpsTime、lat 、lng 、 terminalTime、processTime 字段都是出去數據的數據
    @Override
    public void invoke(ElectricFenceModel model, Context context) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            //能獲取到進圍欄狀態的則修改進圍欄的狀態,即:此條數據為出圍欄 (狀態報警是出(0),且已在mysql結果表中)
            if(model.getStatusAlarm() == 0 && model.getInMysql()) { //此時是更新,在進入數據上更新
                String sql = "update vehicle_networking.electric_fence set outTime=?,gpsTime=?,lat=?,lng=?,terminalTime=?,processTime=? where id=?";
                pstmt = conn.prepareStatement(sql);
                pstmt.setObject(1, model.getOutEleTime());
                pstmt.setObject(2,model.getGpsTime());
                pstmt.setObject(3,model.getLat());
                pstmt.setObject(4,model.getLng());
                pstmt.setObject(5,model.getTerminalTime());
                //設置處理時間為當前時間
                pstmt.setObject(6,sdf.format( new Date() ));
                pstmt.setLong(7,model.getUuid());
            } else {
                //進入圍欄,轉換ElectricFenceModel對象,插入結果數據到電子圍欄結果表中
                String sql = "insert into vehicle_networking.electric_fence(vin,inTime,outTime,gpsTime,lat,lng,eleId,eleName,address,latitude,longitude,radius,terminalTime,processTime) " +
                        "values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
                pstmt = conn.prepareStatement(sql);
                pstmt.setString(1,model.getVin());
                pstmt.setObject(2,model.getInEleTime());
                pstmt.setObject(3,model.getOutEleTime());
                pstmt.setObject(4,model.getGpsTime());
                pstmt.setDouble(5,model.getLat());
                pstmt.setDouble(6, model.getLng());
                pstmt.setInt(7, model.getEleId());
                pstmt.setString(8, model.getEleName());
                pstmt.setString(9, model.getAddress());
                pstmt.setDouble(10, model.getLatitude());
                pstmt.setDouble(11,model.getLongitude());
                pstmt.setFloat(12,model.getRadius());
                pstmt.setObject(13, model.getTerminalTime());
                //設置processTime 時間為當前時間
                pstmt.setObject(14, DateUtil.getCurrentDate());
            }
            //執行
            pstmt.execute();
            logger.warn("MysqlSink,批量插入數據成功,插入{}條數據", pstmt.getMaxRows());
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if(pstmt != null) pstmt.close();
        }

    }

    @Override
    public void close() throws Exception {
        try {
            if(pstmt != null) {
                pstmt.close();
            }
            if(conn != null) {
                conn.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

OnlineStatisticsMysqlSink

package pers.aishuang.flink.streaming.sink.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * mysql兩階段提交代碼實現過程
 * 1、關閉conn自動提交
 * 2、執行sql后,conn.commit() 手動提交
 * 3、try-catch捕獲sql執行過程異常
 * 4、若捕獲到異常 conn.rollback() 事務回滾
 * 5、底層又重新插入這條數據
 */
public class OnlineStatisticsMysqlSink extends RichSinkFunction<OnlineDataObj> {
    //定義是否一直運行的標志
    Boolean isRunning = true;

    //創建日志打印器
    private static final Logger logger = LoggerFactory.getLogger(OnlineStatisticsMysqlSink.class);

    //定義JDBC變量
    Connection conn = null;
    PreparedStatement pstmt = null;

    //定義獲取配置文件參數工具
    ParameterTool parameterTool = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //獲取運行環境全局參數工具
        parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

        //獲取mysql JDBC參數:driver、url、user、password
        String driver = parameterTool.getRequired("jdbc.driver");
        String url = parameterTool.getRequired("jdbc.url");
        String user = parameterTool.getRequired("jdbc.user");
        String password = parameterTool.getRequired("jdbc.password");

        //加載驅動、獲取連接
        Class.forName(driver);
        conn = DriverManager.getConnection(url, user, password);
        //關閉自動提交
        conn.setAutoCommit(false);
        //sql (更新插入)
        String sql = "INSERT INTO online_data(vin,process_time,lat,lng, mileage,is_alarm,alarm_name,terminal_time,earliest_time,max_voltage_battery,min_voltage_battery,max_temperature_value,min_temperature_value,speed,soc,charge_flag,total_voltage,total_current,battery_voltage,probe_temperatures,series_name,model_name,live_time,sales_date,car_type,province,city, county) values(?,now(),?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?) \n" +
                "ON DUPLICATE KEY UPDATE process_time=now(),lat=?,lng=?,mileage=?,is_alarm=?, alarm_name=?,terminal_time=?,max_voltage_battery=?,\n" +
                "min_voltage_battery=?,max_temperature_value=?, min_temperature_value=?, speed=?,soc=?,charge_flag=?, total_voltage=?,\n" +
                "total_current=?,battery_voltage=?, probe_temperatures=?,series_name=?,model_name=?,live_time=?,sales_date=?,car_type=?,\n" +
                "province=?,city=?,county=?";
        //獲取預編譯對象
        pstmt = conn.prepareStatement(sql);

    }

    /**
     * 數據一條一條的提交寫入到MySQL數據中
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(OnlineDataObj value, Context context) throws Exception {
        try {
            //插入數據的參數
            pstmt.setString(1, value.getVin());
            pstmt.setDouble(2, value.getLat());
            pstmt.setDouble(3, value.getLng());
            pstmt.setDouble(4, value.getMileage());
            pstmt.setInt(5, value.getIsAlarm());
            pstmt.setString(6, value.getAlarmName());
            pstmt.setString(7, value.getTerminalTime());
            pstmt.setString(8, value.getEarliestTime());
            pstmt.setDouble(9, value.getMaxVoltageBattery());
            pstmt.setDouble(10, value.getMinVoltageBattery());
            pstmt.setDouble(11, value.getMaxTemperatureValue());
            pstmt.setDouble(12, value.getMinTemperatureValue());
            pstmt.setDouble(13, value.getSpeed());
            pstmt.setInt(14, value.getSoc());
            pstmt.setInt(15, value.getChargeFlag());
            pstmt.setDouble(16, value.getTotalVoltage());
            pstmt.setDouble(17, value.getTotalCurrent());
            pstmt.setString(18, value.getBatteryVoltage());
            pstmt.setString(19, value.getProbeTemperatures());
            pstmt.setString(20, value.getSeriesName());
            pstmt.setString(21, value.getModelName());
            pstmt.setString(22, value.getLiveTime());
            pstmt.setString(23, value.getSalesDate());
            pstmt.setString(24, value.getCarType());
            pstmt.setString(25, value.getProvince());
            pstmt.setString(26, value.getCity());
            pstmt.setString(27, value.getCountry());
            //修改數據的參數
            pstmt.setDouble(28, value.getLat());
            pstmt.setDouble(29, value.getLng());
            pstmt.setDouble(30, value.getMileage());
            pstmt.setInt(31, value.getIsAlarm());
            pstmt.setString(32, value.getAlarmName());
            pstmt.setString(33, value.getTerminalTime());
            pstmt.setDouble(34, value.getMaxVoltageBattery());
            pstmt.setDouble(35, value.getMinVoltageBattery());
            pstmt.setDouble(36, value.getMaxTemperatureValue());
            pstmt.setDouble(37, value.getMinTemperatureValue());
            pstmt.setDouble(38, value.getSpeed());
            pstmt.setInt(39, value.getSoc());
            pstmt.setInt(40, value.getChargeFlag());
            pstmt.setDouble(41, value.getTotalVoltage());
            pstmt.setDouble(42, value.getTotalCurrent());
            pstmt.setString(43, value.getBatteryVoltage());
            pstmt.setString(44, value.getProbeTemperatures());
            pstmt.setString(45, value.getSeriesName());
            pstmt.setString(46, value.getModelName());
            pstmt.setString(47, value.getLiveTime());
            pstmt.setString(48, value.getSalesDate());
            pstmt.setString(49, value.getCarType());
            pstmt.setString(50, value.getProvince());
            pstmt.setString(51, value.getCity());
            pstmt.setString(52, value.getCountry());

            //執行數據更新和遞交操作
            pstmt.execute();
            conn.commit();
        } catch (SQLException e) {
            conn.rollback();
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(pstmt != null) pstmt.close();
        if(conn != null) conn.close();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM