Flink流處理-Source之Mysql


MysqlEletricFenceResultSource

package pers.aishuang.flink.streaming.source.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;


/**
 * 讀取Mysql中電子圍欄相關表結合成后的規則
 */
public class MysqlElectricFenceResultSource extends RichSourceFunction {
    //新建日志打印器
    private static final Logger logger = LoggerFactory.getLogger(MysqlElectricFenceResultSource.class);
    //定義JDBC變量
    private static Connection conn = null;
    private static PreparedStatement pstmt = null;

    //設置標識用於記錄當前循環讀取mysql配置
    private static Boolean flag = true;
    //定義獲取配置文件參數工具
    private static ParameterTool parameterTool = null;
    private static Map<String, String> parasMap = null;
    private static ParameterTool globalJobParameters = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //方式一:通過ParameterTool自己再獲取配置文件參數
        parameterTool = ParameterTool.fromPropertiesFile(MysqlElectricFenceResultSource.class
                .getClassLoader()
                .getResourceAsStream("conf.properties"));
        //-- 獲取Driver、url、user、password
        String driver = parameterTool.getRequired("jdbc.driver");
        String url = parameterTool.getRequired("jdbc.url");
        String user = parameterTool.getRequired("jdbc.user");
        String password = parameterTool.getRequired("jdbc.password");
        //方式二:通過執行環境設置的全局任務參數里獲取 參數
        parasMap = getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters()
                .toMap();
        String driver2 = parasMap.get("jdbc.driver");
        String url2 = parasMap.get("jdbc.url");
        String user2 = parasMap.get("jdbc.user");
        String password2 = parasMap.get("jdbc.password");
        //方式三:與方式二本質上一樣
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();
        String driver3 = globalJobParameters.getRequired("jdbc.driver");
        String url3 = globalJobParameters.getRequired("jdbc.url");
        String user3 = globalJobParameters.getRequired("jdbc.user");
        String password3 = globalJobParameters.getRequired("jdbc.password");

        //獲取MySQL連接
        //-- 加載驅動
        Class.forName(driver);
        //-- 獲取連接
        conn = DriverManager.getConnection(url, user, password);
        //-- 執行SQL
        //查出 有進入時間 沒有出去時間,按照vin分組,找到目前最小id(電子圍欄結果表的主鍵id)
        String sql = "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin";
        //-- 創建預編譯對象
        pstmt = conn.prepareStatement(sql);

    }

    @Override
    public void run(SourceContext ctx) throws Exception {
        while(flag) {
            HashMap<String, Integer> vehInfoMap = new HashMap<>();
            ResultSet rs = pstmt.executeQuery();
            while(rs.next()) {
                vehInfoMap.put(rs.getString("vin") , rs.getInt("id"));
            }
            if(vehInfoMap.isEmpty()){
                logger.warn("從mysql中electronic_fence相關表的數據為空");
            } else {
                ctx.collect(vehInfoMap);
                logger.info("查詢電子圍欄分析結果表中數據,存在記錄數據為:%s 條",vehInfoMap.size());
            }
            if(!rs.isClosed()) {rs.close();}
            //多久從mysql獲取一次數據
            //TimeUnit.MICROSECONDS.sleep(parameterTool.getLong("vehinfo.millionseconds"));

            //每1秒鍾獲取一次最新數據,因為窗口每隔90s進行一次計算,因此該時間一定要小於窗口觸發計算的頻率
            TimeUnit.MICROSECONDS.sleep(1);//1ms
        }

    }


    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(!pstmt.isClosed()){pstmt.close();}
        if(!conn.isClosed()) {conn.close();}
    }
}

MysqlElectricFenceSource

package pers.aishuang.flink.streaming.source.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

public class MySQLElectricFenceSource extends RichSourceFunction<HashMap<String, ElectricFenceResultTmp>> {
    private static final Logger logger = LoggerFactory.getLogger(MySQLElectricFenceSource.class.getSimpleName());
    private static Connection conn = null;
    private static Statement stmt = null;
    //設置標識用於記錄當前循環讀取mysql配置
    private static Boolean flag = true;
    private static String elerulesTime = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //1. 獲取上下文中的 parameterTool
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        //2. 讀取配置文件中參數,注冊驅動、url、user、passworld
        String driver = globalJobParameters.getRequired("jdbc.driver");
        String url = globalJobParameters.getRequired("jdbc.url");
        String user = globalJobParameters.getRequired("jdbc.user");
        String password = globalJobParameters.getRequired("jdbc.password");
        //3. 多長時間去查一次mysql數據
        elerulesTime = globalJobParameters.getRequired("elerules.millionseconds");
        //4. 設置驅動和連接
        Class.forName(driver);
        conn = DriverManager.getConnection(url,user,password);
        stmt = conn.createStatement();
    }

    @Override
    public void run(SourceContext<HashMap<String, ElectricFenceResultTmp>> ctx) throws Exception {
        while (flag){
            HashMap<String, ElectricFenceResultTmp> map = new HashMap<>();
            //1. 查詢SQL
            String sql = "select " +
                            "vins.vin,setting.id,setting.name,setting.address,setting.radius," +
                            "setting.longitude,setting.latitude,setting.start_time,setting.end_time \n" +
                        "from vehicle_networking.electronic_fence_setting setting \n" +
                        "inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id \n" +
                        "where setting.status=1";
            ResultSet rs = stmt.executeQuery(sql);
            while(rs.next()){
                map.put(
                        rs.getString("vin"),
                        new ElectricFenceResultTmp(
                                rs.getInt("id"),
                                rs.getString("name"),
                                rs.getString("address"),
                                rs.getFloat("radius"),
                                rs.getDouble("longitude"),
                                rs.getDouble("latitude"),
                                rs.getDate("start_time"),
                                rs.getDate("end_time")
                        )
                );
            }
            ctx.collect(map);
            //關閉rs
            if(!rs.isClosed()) {
                rs.close();
            }
            //收集electricFenceResult 指定休眠時間 ms
            TimeUnit.MICROSECONDS.sleep(Long.parseLong(elerulesTime));
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(!stmt.isClosed()) stmt.close();
        if(!conn.isClosed()) conn.close();
    }
}

VehicleInfoMysqlSource

package pers.aishuang.flink.streaming.source.mysql;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.HashMap;

public class VehicleInfoMysqlSource extends RichSourceFunction<HashMap<String, VehicleInfoModel>> {
    //創建日志打印器
    private Logger logger = LoggerFactory.getLogger(VehicleInfoMysqlSource.class);

    //定義JDBC變量
    private Connection conn = null;
    private PreparedStatement pstmt = null;

    //定義獲取配置文件參數工具
    ParameterTool parameterTool;

    //定義是否運行的標記
    private boolean isRunning = true; //flag

    @Override
    public void open(Configuration parameters) throws Exception {
        //通過全局變量獲取配置參數
        parameterTool = (ParameterTool) getRuntimeContext()
                                    .getExecutionConfig().getGlobalJobParameters();
        //獲取mysql JDBC的 driver、url、user、password
        String driver = parameterTool.getRequired("jdbc.driver");
        String url = parameterTool.getRequired("jdbc.url");
        String user = parameterTool.getRequired("jdbc.user");
        String password = parameterTool.getRequired("jdbc.password");
        //加載驅動、獲取連接、創建sql字符串、獲取預編譯對象
        Class.forName(driver);
        conn = DriverManager.getConnection(url);
        String sql = "select t12.vin,t12.series_name,t12.model_name,t12.series_code,t12.model_code,t12.nick_name,t3.sales_date,t4.car_type\n" +
                " from (\n" +
                "select t1.vin, t1.series_name, t2.show_name as model_name, t1.series_code,t2.model_code,t2.nick_name,t1.vehicle_id\n" +
                " from vehicle_networking.dcs_vehicles t1 left join vehicle_networking.t_car_type_code t2 on t1.model_code = t2.model_code) t12\n" +
                " left join  (select vehicle_id, max(sales_date) sales_date from vehicle_networking.dcs_sales group by vehicle_id) t3\n" +
                " on t12.vehicle_id = t3.vehicle_id\n" +
                " left join\n" +
                " (select tc.vin,'net_cat' car_type from vehicle_networking.t_net_car tc\n" +
                " union all select tt.vin,'taxi' car_type from vehicle_networking.t_taxi tt\n" +
                " union all select tp.vin,'private_car' car_type from vehicle_networking.t_private_car tp\n" +
                " union all select tm.vin,'model_car' car_type from vehicle_networking.t_model_car tm) t4\n" +
                " on t12.vin = t4.vin";
        pstmt = conn.prepareStatement(sql);
    }

    @Override
    public void run(SourceContext<HashMap<String, VehicleInfoModel>> ctx) throws Exception {
        while(isRunning) {
            ResultSet resultSet = pstmt.executeQuery();
            HashMap<String, VehicleInfoModel> vehicleInfoMap = new HashMap<>();
            while(resultSet.next()) {
                VehicleInfoModel vehicleInfoModel = new VehicleInfoModel();
                //車架號
                String vin = resultSet.getString("vin");
                //車系
                String seriesName = resultSet.getString("series_name");
                //車型
                String modelName = resultSet.getString("model_name");
                //車系編碼
                String seriesCode = resultSet.getString("series_code");
                //車型編碼
                String modelCode = resultSet.getString("model_code");
                //車輛類型簡稱
                String nickName = resultSet.getString("nick_name");
                //出售日期
                String salesDate = resultSet.getString("sales_date");
                //車輛用途
                String carType = resultSet.getString("car_type");

                //年限
                String liveTime = "-1";
                if (salesDate != null) {
                    //當前日期-售出日期=使用年限
                    liveTime = String.valueOf((new Date().getTime() - resultSet.getDate("sales_date").getTime()) / 1000 / 3600 / 24 / 365);
                }
                if (null == vin) {
                    vin = "未知";
                }
                if (null == seriesName) {
                    seriesName = "未知";
                }
                if (null == modelName) {
                    modelName = "未知";
                }
                if (null == seriesCode) {
                    seriesCode = "未知";
                }
                if (null == modelCode) {
                    modelCode = "未知";
                }
                if (null == nickName) {
                    nickName = "未知";
                }
                if (null == salesDate) {
                    salesDate = "未知";
                }
                if (null == carType) {
                    carType = "未知";
                }

                vehicleInfoModel.setSeriesName(seriesName);
                vehicleInfoModel.setSeriesCode(seriesCode);
                vehicleInfoModel.setModelName(modelName);
                vehicleInfoModel.setModelCode(modelCode);
                vehicleInfoModel.setLiveTime(liveTime);
                vehicleInfoModel.setNickName(nickName);
                vehicleInfoModel.setCarType(carType);
                vehicleInfoModel.setSalesDate(salesDate);

                //將車輛基礎數據封裝到集合返回
                vehicleInfoMap.put(vin, vehicleInfoModel);
            }
            if(vehicleInfoMap.isEmpty()) {
                logger.warn("從車輛基礎數據表中查詢數據為空....");
            }else{
                ctx.collect(vehicleInfoMap);
            }
            resultSet.close();
            //設置多久從mysql查詢一次數據(及規則變更周期時間)
            Thread.sleep(parameterTool.getInt("vehinfo.millionseconds"));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    /**
     * 釋放資源
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        if(pstmt != null) pstmt.close();
        if(conn != null) conn.close();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM