ElectricFenceMysqlSink
package pers.aishuang.flink.streaming.sink.mysql;
import com.ibm.icu.text.SimpleDateFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;
import pers.aishuang.flink.streaming.utils.DateUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
public class ElectricFenceMysqlSink extends RichSinkFunction<ElectricFenceModel> {
//创建日志打印器
private static Logger logger = LoggerFactory.getLogger(ElectricFenceMysqlSink.class);
//定义JDBC变量
private static Connection conn = null;
private static PreparedStatement pstmt = null;
@Override
public void open(Configuration parameters) throws Exception {
//获取conf.properties配置文件参数
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//获取mysql的driver、url、user、password
String driver = globalJobParameters.getRequired("jdbc.driver");
String url = globalJobParameters.getRequired("jdbc.url");
String user = globalJobParameters.getRequired("jdbc.user");
String password = globalJobParameters.getRequired("jdbc.password");
//加载驱动、获取Mysql连接、创建SQL字符串、获取sql预编译
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
}
//本质上是进入数据和出入数据合成一条数据,
// gpsTime、lat 、lng 、 terminalTime、processTime 字段都是出去数据的数据
@Override
public void invoke(ElectricFenceModel model, Context context) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
//能获取到进围栏状态的则修改进围栏的状态,即:此条数据为出围栏 (状态报警是出(0),且已在mysql结果表中)
if(model.getStatusAlarm() == 0 && model.getInMysql()) { //此时是更新,在进入数据上更新
String sql = "update vehicle_networking.electric_fence set outTime=?,gpsTime=?,lat=?,lng=?,terminalTime=?,processTime=? where id=?";
pstmt = conn.prepareStatement(sql);
pstmt.setObject(1, model.getOutEleTime());
pstmt.setObject(2,model.getGpsTime());
pstmt.setObject(3,model.getLat());
pstmt.setObject(4,model.getLng());
pstmt.setObject(5,model.getTerminalTime());
//设置处理时间为当前时间
pstmt.setObject(6,sdf.format( new Date() ));
pstmt.setLong(7,model.getUuid());
} else {
//进入围栏,转换ElectricFenceModel对象,插入结果数据到电子围栏结果表中
String sql = "insert into vehicle_networking.electric_fence(vin,inTime,outTime,gpsTime,lat,lng,eleId,eleName,address,latitude,longitude,radius,terminalTime,processTime) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1,model.getVin());
pstmt.setObject(2,model.getInEleTime());
pstmt.setObject(3,model.getOutEleTime());
pstmt.setObject(4,model.getGpsTime());
pstmt.setDouble(5,model.getLat());
pstmt.setDouble(6, model.getLng());
pstmt.setInt(7, model.getEleId());
pstmt.setString(8, model.getEleName());
pstmt.setString(9, model.getAddress());
pstmt.setDouble(10, model.getLatitude());
pstmt.setDouble(11,model.getLongitude());
pstmt.setFloat(12,model.getRadius());
pstmt.setObject(13, model.getTerminalTime());
//设置processTime 时间为当前时间
pstmt.setObject(14, DateUtil.getCurrentDate());
}
//执行
pstmt.execute();
logger.warn("MysqlSink,批量插入数据成功,插入{}条数据", pstmt.getMaxRows());
} catch (SQLException e) {
e.printStackTrace();
} finally {
if(pstmt != null) pstmt.close();
}
}
@Override
public void close() throws Exception {
try {
if(pstmt != null) {
pstmt.close();
}
if(conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
OnlineStatisticsMysqlSink
package pers.aishuang.flink.streaming.sink.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* mysql两阶段提交代码实现过程
* 1、关闭conn自动提交
* 2、执行sql后,conn.commit() 手动提交
* 3、try-catch捕获sql执行过程异常
* 4、若捕获到异常 conn.rollback() 事务回滚
* 5、底层又重新插入这条数据
*/
public class OnlineStatisticsMysqlSink extends RichSinkFunction<OnlineDataObj> {
//定义是否一直运行的标志
Boolean isRunning = true;
//创建日志打印器
private static final Logger logger = LoggerFactory.getLogger(OnlineStatisticsMysqlSink.class);
//定义JDBC变量
Connection conn = null;
PreparedStatement pstmt = null;
//定义获取配置文件参数工具
ParameterTool parameterTool = null;
@Override
public void open(Configuration parameters) throws Exception {
//获取运行环境全局参数工具
parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//获取mysql JDBC参数:driver、url、user、password
String driver = parameterTool.getRequired("jdbc.driver");
String url = parameterTool.getRequired("jdbc.url");
String user = parameterTool.getRequired("jdbc.user");
String password = parameterTool.getRequired("jdbc.password");
//加载驱动、获取连接
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
//关闭自动提交
conn.setAutoCommit(false);
//sql (更新插入)
String sql = "INSERT INTO online_data(vin,process_time,lat,lng, mileage,is_alarm,alarm_name,terminal_time,earliest_time,max_voltage_battery,min_voltage_battery,max_temperature_value,min_temperature_value,speed,soc,charge_flag,total_voltage,total_current,battery_voltage,probe_temperatures,series_name,model_name,live_time,sales_date,car_type,province,city, county) values(?,now(),?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?) \n" +
"ON DUPLICATE KEY UPDATE process_time=now(),lat=?,lng=?,mileage=?,is_alarm=?, alarm_name=?,terminal_time=?,max_voltage_battery=?,\n" +
"min_voltage_battery=?,max_temperature_value=?, min_temperature_value=?, speed=?,soc=?,charge_flag=?, total_voltage=?,\n" +
"total_current=?,battery_voltage=?, probe_temperatures=?,series_name=?,model_name=?,live_time=?,sales_date=?,car_type=?,\n" +
"province=?,city=?,county=?";
//获取预编译对象
pstmt = conn.prepareStatement(sql);
}
/**
* 数据一条一条的提交写入到MySQL数据中
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(OnlineDataObj value, Context context) throws Exception {
try {
//插入数据的参数
pstmt.setString(1, value.getVin());
pstmt.setDouble(2, value.getLat());
pstmt.setDouble(3, value.getLng());
pstmt.setDouble(4, value.getMileage());
pstmt.setInt(5, value.getIsAlarm());
pstmt.setString(6, value.getAlarmName());
pstmt.setString(7, value.getTerminalTime());
pstmt.setString(8, value.getEarliestTime());
pstmt.setDouble(9, value.getMaxVoltageBattery());
pstmt.setDouble(10, value.getMinVoltageBattery());
pstmt.setDouble(11, value.getMaxTemperatureValue());
pstmt.setDouble(12, value.getMinTemperatureValue());
pstmt.setDouble(13, value.getSpeed());
pstmt.setInt(14, value.getSoc());
pstmt.setInt(15, value.getChargeFlag());
pstmt.setDouble(16, value.getTotalVoltage());
pstmt.setDouble(17, value.getTotalCurrent());
pstmt.setString(18, value.getBatteryVoltage());
pstmt.setString(19, value.getProbeTemperatures());
pstmt.setString(20, value.getSeriesName());
pstmt.setString(21, value.getModelName());
pstmt.setString(22, value.getLiveTime());
pstmt.setString(23, value.getSalesDate());
pstmt.setString(24, value.getCarType());
pstmt.setString(25, value.getProvince());
pstmt.setString(26, value.getCity());
pstmt.setString(27, value.getCountry());
//修改数据的参数
pstmt.setDouble(28, value.getLat());
pstmt.setDouble(29, value.getLng());
pstmt.setDouble(30, value.getMileage());
pstmt.setInt(31, value.getIsAlarm());
pstmt.setString(32, value.getAlarmName());
pstmt.setString(33, value.getTerminalTime());
pstmt.setDouble(34, value.getMaxVoltageBattery());
pstmt.setDouble(35, value.getMinVoltageBattery());
pstmt.setDouble(36, value.getMaxTemperatureValue());
pstmt.setDouble(37, value.getMinTemperatureValue());
pstmt.setDouble(38, value.getSpeed());
pstmt.setInt(39, value.getSoc());
pstmt.setInt(40, value.getChargeFlag());
pstmt.setDouble(41, value.getTotalVoltage());
pstmt.setDouble(42, value.getTotalCurrent());
pstmt.setString(43, value.getBatteryVoltage());
pstmt.setString(44, value.getProbeTemperatures());
pstmt.setString(45, value.getSeriesName());
pstmt.setString(46, value.getModelName());
pstmt.setString(47, value.getLiveTime());
pstmt.setString(48, value.getSalesDate());
pstmt.setString(49, value.getCarType());
pstmt.setString(50, value.getProvince());
pstmt.setString(51, value.getCity());
pstmt.setString(52, value.getCountry());
//执行数据更新和递交操作
pstmt.execute();
conn.commit();
} catch (SQLException e) {
conn.rollback();
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
super.close();
if(pstmt != null) pstmt.close();
if(conn != null) conn.close();
}
}