MysqlEletricFenceResultSource
package pers.aishuang.flink.streaming.source.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 讀取Mysql中電子圍欄相關表結合成后的規則
*/
public class MysqlElectricFenceResultSource extends RichSourceFunction {
//新建日志打印器
private static final Logger logger = LoggerFactory.getLogger(MysqlElectricFenceResultSource.class);
//定義JDBC變量
private static Connection conn = null;
private static PreparedStatement pstmt = null;
//設置標識用於記錄當前循環讀取mysql配置
private static Boolean flag = true;
//定義獲取配置文件參數工具
private static ParameterTool parameterTool = null;
private static Map<String, String> parasMap = null;
private static ParameterTool globalJobParameters = null;
@Override
public void open(Configuration parameters) throws Exception {
//方式一:通過ParameterTool自己再獲取配置文件參數
parameterTool = ParameterTool.fromPropertiesFile(MysqlElectricFenceResultSource.class
.getClassLoader()
.getResourceAsStream("conf.properties"));
//-- 獲取Driver、url、user、password
String driver = parameterTool.getRequired("jdbc.driver");
String url = parameterTool.getRequired("jdbc.url");
String user = parameterTool.getRequired("jdbc.user");
String password = parameterTool.getRequired("jdbc.password");
//方式二:通過執行環境設置的全局任務參數里獲取 參數
parasMap = getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters()
.toMap();
String driver2 = parasMap.get("jdbc.driver");
String url2 = parasMap.get("jdbc.url");
String user2 = parasMap.get("jdbc.user");
String password2 = parasMap.get("jdbc.password");
//方式三:與方式二本質上一樣
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters();
String driver3 = globalJobParameters.getRequired("jdbc.driver");
String url3 = globalJobParameters.getRequired("jdbc.url");
String user3 = globalJobParameters.getRequired("jdbc.user");
String password3 = globalJobParameters.getRequired("jdbc.password");
//獲取MySQL連接
//-- 加載驅動
Class.forName(driver);
//-- 獲取連接
conn = DriverManager.getConnection(url, user, password);
//-- 執行SQL
//查出 有進入時間 沒有出去時間,按照vin分組,找到目前最小id(電子圍欄結果表的主鍵id)
String sql = "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin";
//-- 創建預編譯對象
pstmt = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext ctx) throws Exception {
while(flag) {
HashMap<String, Integer> vehInfoMap = new HashMap<>();
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
vehInfoMap.put(rs.getString("vin") , rs.getInt("id"));
}
if(vehInfoMap.isEmpty()){
logger.warn("從mysql中electronic_fence相關表的數據為空");
} else {
ctx.collect(vehInfoMap);
logger.info("查詢電子圍欄分析結果表中數據,存在記錄數據為:%s 條",vehInfoMap.size());
}
if(!rs.isClosed()) {rs.close();}
//多久從mysql獲取一次數據
//TimeUnit.MICROSECONDS.sleep(parameterTool.getLong("vehinfo.millionseconds"));
//每1秒鍾獲取一次最新數據,因為窗口每隔90s進行一次計算,因此該時間一定要小於窗口觸發計算的頻率
TimeUnit.MICROSECONDS.sleep(1);//1ms
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
super.close();
if(!pstmt.isClosed()){pstmt.close();}
if(!conn.isClosed()) {conn.close();}
}
}
MysqlElectricFenceSource
package pers.aishuang.flink.streaming.source.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceResultTmp;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
public class MySQLElectricFenceSource extends RichSourceFunction<HashMap<String, ElectricFenceResultTmp>> {
private static final Logger logger = LoggerFactory.getLogger(MySQLElectricFenceSource.class.getSimpleName());
private static Connection conn = null;
private static Statement stmt = null;
//設置標識用於記錄當前循環讀取mysql配置
private static Boolean flag = true;
private static String elerulesTime = null;
@Override
public void open(Configuration parameters) throws Exception {
//1. 獲取上下文中的 parameterTool
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//2. 讀取配置文件中參數,注冊驅動、url、user、passworld
String driver = globalJobParameters.getRequired("jdbc.driver");
String url = globalJobParameters.getRequired("jdbc.url");
String user = globalJobParameters.getRequired("jdbc.user");
String password = globalJobParameters.getRequired("jdbc.password");
//3. 多長時間去查一次mysql數據
elerulesTime = globalJobParameters.getRequired("elerules.millionseconds");
//4. 設置驅動和連接
Class.forName(driver);
conn = DriverManager.getConnection(url,user,password);
stmt = conn.createStatement();
}
@Override
public void run(SourceContext<HashMap<String, ElectricFenceResultTmp>> ctx) throws Exception {
while (flag){
HashMap<String, ElectricFenceResultTmp> map = new HashMap<>();
//1. 查詢SQL
String sql = "select " +
"vins.vin,setting.id,setting.name,setting.address,setting.radius," +
"setting.longitude,setting.latitude,setting.start_time,setting.end_time \n" +
"from vehicle_networking.electronic_fence_setting setting \n" +
"inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id \n" +
"where setting.status=1";
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()){
map.put(
rs.getString("vin"),
new ElectricFenceResultTmp(
rs.getInt("id"),
rs.getString("name"),
rs.getString("address"),
rs.getFloat("radius"),
rs.getDouble("longitude"),
rs.getDouble("latitude"),
rs.getDate("start_time"),
rs.getDate("end_time")
)
);
}
ctx.collect(map);
//關閉rs
if(!rs.isClosed()) {
rs.close();
}
//收集electricFenceResult 指定休眠時間 ms
TimeUnit.MICROSECONDS.sleep(Long.parseLong(elerulesTime));
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
super.close();
if(!stmt.isClosed()) stmt.close();
if(!conn.isClosed()) conn.close();
}
}
VehicleInfoMysqlSource
package pers.aishuang.flink.streaming.source.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.HashMap;
public class VehicleInfoMysqlSource extends RichSourceFunction<HashMap<String, VehicleInfoModel>> {
//創建日志打印器
private Logger logger = LoggerFactory.getLogger(VehicleInfoMysqlSource.class);
//定義JDBC變量
private Connection conn = null;
private PreparedStatement pstmt = null;
//定義獲取配置文件參數工具
ParameterTool parameterTool;
//定義是否運行的標記
private boolean isRunning = true; //flag
@Override
public void open(Configuration parameters) throws Exception {
//通過全局變量獲取配置參數
parameterTool = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//獲取mysql JDBC的 driver、url、user、password
String driver = parameterTool.getRequired("jdbc.driver");
String url = parameterTool.getRequired("jdbc.url");
String user = parameterTool.getRequired("jdbc.user");
String password = parameterTool.getRequired("jdbc.password");
//加載驅動、獲取連接、創建sql字符串、獲取預編譯對象
Class.forName(driver);
conn = DriverManager.getConnection(url);
String sql = "select t12.vin,t12.series_name,t12.model_name,t12.series_code,t12.model_code,t12.nick_name,t3.sales_date,t4.car_type\n" +
" from (\n" +
"select t1.vin, t1.series_name, t2.show_name as model_name, t1.series_code,t2.model_code,t2.nick_name,t1.vehicle_id\n" +
" from vehicle_networking.dcs_vehicles t1 left join vehicle_networking.t_car_type_code t2 on t1.model_code = t2.model_code) t12\n" +
" left join (select vehicle_id, max(sales_date) sales_date from vehicle_networking.dcs_sales group by vehicle_id) t3\n" +
" on t12.vehicle_id = t3.vehicle_id\n" +
" left join\n" +
" (select tc.vin,'net_cat' car_type from vehicle_networking.t_net_car tc\n" +
" union all select tt.vin,'taxi' car_type from vehicle_networking.t_taxi tt\n" +
" union all select tp.vin,'private_car' car_type from vehicle_networking.t_private_car tp\n" +
" union all select tm.vin,'model_car' car_type from vehicle_networking.t_model_car tm) t4\n" +
" on t12.vin = t4.vin";
pstmt = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<HashMap<String, VehicleInfoModel>> ctx) throws Exception {
while(isRunning) {
ResultSet resultSet = pstmt.executeQuery();
HashMap<String, VehicleInfoModel> vehicleInfoMap = new HashMap<>();
while(resultSet.next()) {
VehicleInfoModel vehicleInfoModel = new VehicleInfoModel();
//車架號
String vin = resultSet.getString("vin");
//車系
String seriesName = resultSet.getString("series_name");
//車型
String modelName = resultSet.getString("model_name");
//車系編碼
String seriesCode = resultSet.getString("series_code");
//車型編碼
String modelCode = resultSet.getString("model_code");
//車輛類型簡稱
String nickName = resultSet.getString("nick_name");
//出售日期
String salesDate = resultSet.getString("sales_date");
//車輛用途
String carType = resultSet.getString("car_type");
//年限
String liveTime = "-1";
if (salesDate != null) {
//當前日期-售出日期=使用年限
liveTime = String.valueOf((new Date().getTime() - resultSet.getDate("sales_date").getTime()) / 1000 / 3600 / 24 / 365);
}
if (null == vin) {
vin = "未知";
}
if (null == seriesName) {
seriesName = "未知";
}
if (null == modelName) {
modelName = "未知";
}
if (null == seriesCode) {
seriesCode = "未知";
}
if (null == modelCode) {
modelCode = "未知";
}
if (null == nickName) {
nickName = "未知";
}
if (null == salesDate) {
salesDate = "未知";
}
if (null == carType) {
carType = "未知";
}
vehicleInfoModel.setSeriesName(seriesName);
vehicleInfoModel.setSeriesCode(seriesCode);
vehicleInfoModel.setModelName(modelName);
vehicleInfoModel.setModelCode(modelCode);
vehicleInfoModel.setLiveTime(liveTime);
vehicleInfoModel.setNickName(nickName);
vehicleInfoModel.setCarType(carType);
vehicleInfoModel.setSalesDate(salesDate);
//將車輛基礎數據封裝到集合返回
vehicleInfoMap.put(vin, vehicleInfoModel);
}
if(vehicleInfoMap.isEmpty()) {
logger.warn("從車輛基礎數據表中查詢數據為空....");
}else{
ctx.collect(vehicleInfoMap);
}
resultSet.close();
//設置多久從mysql查詢一次數據(及規則變更周期時間)
Thread.sleep(parameterTool.getInt("vehinfo.millionseconds"));
}
}
@Override
public void cancel() {
isRunning = false;
}
/**
* 釋放資源
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
if(pstmt != null) pstmt.close();
if(conn != null) conn.close();
}
}