ElectricFenceMysqlSink
package pers.aishuang.flink.streaming.sink.mysql;
import com.ibm.icu.text.SimpleDateFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;
import pers.aishuang.flink.streaming.utils.DateUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
public class ElectricFenceMysqlSink extends RichSinkFunction<ElectricFenceModel> {
//創建日志打印器
private static Logger logger = LoggerFactory.getLogger(ElectricFenceMysqlSink.class);
//定義JDBC變量
private static Connection conn = null;
private static PreparedStatement pstmt = null;
@Override
public void open(Configuration parameters) throws Exception {
//獲取conf.properties配置文件參數
ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
//獲取mysql的driver、url、user、password
String driver = globalJobParameters.getRequired("jdbc.driver");
String url = globalJobParameters.getRequired("jdbc.url");
String user = globalJobParameters.getRequired("jdbc.user");
String password = globalJobParameters.getRequired("jdbc.password");
//加載驅動、獲取Mysql連接、創建SQL字符串、獲取sql預編譯
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
}
//本質上是進入數據和出入數據合成一條數據,
// gpsTime、lat 、lng 、 terminalTime、processTime 字段都是出去數據的數據
@Override
public void invoke(ElectricFenceModel model, Context context) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
//能獲取到進圍欄狀態的則修改進圍欄的狀態,即:此條數據為出圍欄 (狀態報警是出(0),且已在mysql結果表中)
if(model.getStatusAlarm() == 0 && model.getInMysql()) { //此時是更新,在進入數據上更新
String sql = "update vehicle_networking.electric_fence set outTime=?,gpsTime=?,lat=?,lng=?,terminalTime=?,processTime=? where id=?";
pstmt = conn.prepareStatement(sql);
pstmt.setObject(1, model.getOutEleTime());
pstmt.setObject(2,model.getGpsTime());
pstmt.setObject(3,model.getLat());
pstmt.setObject(4,model.getLng());
pstmt.setObject(5,model.getTerminalTime());
//設置處理時間為當前時間
pstmt.setObject(6,sdf.format( new Date() ));
pstmt.setLong(7,model.getUuid());
} else {
//進入圍欄,轉換ElectricFenceModel對象,插入結果數據到電子圍欄結果表中
String sql = "insert into vehicle_networking.electric_fence(vin,inTime,outTime,gpsTime,lat,lng,eleId,eleName,address,latitude,longitude,radius,terminalTime,processTime) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1,model.getVin());
pstmt.setObject(2,model.getInEleTime());
pstmt.setObject(3,model.getOutEleTime());
pstmt.setObject(4,model.getGpsTime());
pstmt.setDouble(5,model.getLat());
pstmt.setDouble(6, model.getLng());
pstmt.setInt(7, model.getEleId());
pstmt.setString(8, model.getEleName());
pstmt.setString(9, model.getAddress());
pstmt.setDouble(10, model.getLatitude());
pstmt.setDouble(11,model.getLongitude());
pstmt.setFloat(12,model.getRadius());
pstmt.setObject(13, model.getTerminalTime());
//設置processTime 時間為當前時間
pstmt.setObject(14, DateUtil.getCurrentDate());
}
//執行
pstmt.execute();
logger.warn("MysqlSink,批量插入數據成功,插入{}條數據", pstmt.getMaxRows());
} catch (SQLException e) {
e.printStackTrace();
} finally {
if(pstmt != null) pstmt.close();
}
}
@Override
public void close() throws Exception {
try {
if(pstmt != null) {
pstmt.close();
}
if(conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
OnlineStatisticsMysqlSink
package pers.aishuang.flink.streaming.sink.mysql;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* mysql兩階段提交代碼實現過程
* 1、關閉conn自動提交
* 2、執行sql后,conn.commit() 手動提交
* 3、try-catch捕獲sql執行過程異常
* 4、若捕獲到異常 conn.rollback() 事務回滾
* 5、底層又重新插入這條數據
*/
public class OnlineStatisticsMysqlSink extends RichSinkFunction<OnlineDataObj> {
//定義是否一直運行的標志
Boolean isRunning = true;
//創建日志打印器
private static final Logger logger = LoggerFactory.getLogger(OnlineStatisticsMysqlSink.class);
//定義JDBC變量
Connection conn = null;
PreparedStatement pstmt = null;
//定義獲取配置文件參數工具
ParameterTool parameterTool = null;
@Override
public void open(Configuration parameters) throws Exception {
//獲取運行環境全局參數工具
parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//獲取mysql JDBC參數:driver、url、user、password
String driver = parameterTool.getRequired("jdbc.driver");
String url = parameterTool.getRequired("jdbc.url");
String user = parameterTool.getRequired("jdbc.user");
String password = parameterTool.getRequired("jdbc.password");
//加載驅動、獲取連接
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
//關閉自動提交
conn.setAutoCommit(false);
//sql (更新插入)
String sql = "INSERT INTO online_data(vin,process_time,lat,lng, mileage,is_alarm,alarm_name,terminal_time,earliest_time,max_voltage_battery,min_voltage_battery,max_temperature_value,min_temperature_value,speed,soc,charge_flag,total_voltage,total_current,battery_voltage,probe_temperatures,series_name,model_name,live_time,sales_date,car_type,province,city, county) values(?,now(),?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?) \n" +
"ON DUPLICATE KEY UPDATE process_time=now(),lat=?,lng=?,mileage=?,is_alarm=?, alarm_name=?,terminal_time=?,max_voltage_battery=?,\n" +
"min_voltage_battery=?,max_temperature_value=?, min_temperature_value=?, speed=?,soc=?,charge_flag=?, total_voltage=?,\n" +
"total_current=?,battery_voltage=?, probe_temperatures=?,series_name=?,model_name=?,live_time=?,sales_date=?,car_type=?,\n" +
"province=?,city=?,county=?";
//獲取預編譯對象
pstmt = conn.prepareStatement(sql);
}
/**
* 數據一條一條的提交寫入到MySQL數據中
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(OnlineDataObj value, Context context) throws Exception {
try {
//插入數據的參數
pstmt.setString(1, value.getVin());
pstmt.setDouble(2, value.getLat());
pstmt.setDouble(3, value.getLng());
pstmt.setDouble(4, value.getMileage());
pstmt.setInt(5, value.getIsAlarm());
pstmt.setString(6, value.getAlarmName());
pstmt.setString(7, value.getTerminalTime());
pstmt.setString(8, value.getEarliestTime());
pstmt.setDouble(9, value.getMaxVoltageBattery());
pstmt.setDouble(10, value.getMinVoltageBattery());
pstmt.setDouble(11, value.getMaxTemperatureValue());
pstmt.setDouble(12, value.getMinTemperatureValue());
pstmt.setDouble(13, value.getSpeed());
pstmt.setInt(14, value.getSoc());
pstmt.setInt(15, value.getChargeFlag());
pstmt.setDouble(16, value.getTotalVoltage());
pstmt.setDouble(17, value.getTotalCurrent());
pstmt.setString(18, value.getBatteryVoltage());
pstmt.setString(19, value.getProbeTemperatures());
pstmt.setString(20, value.getSeriesName());
pstmt.setString(21, value.getModelName());
pstmt.setString(22, value.getLiveTime());
pstmt.setString(23, value.getSalesDate());
pstmt.setString(24, value.getCarType());
pstmt.setString(25, value.getProvince());
pstmt.setString(26, value.getCity());
pstmt.setString(27, value.getCountry());
//修改數據的參數
pstmt.setDouble(28, value.getLat());
pstmt.setDouble(29, value.getLng());
pstmt.setDouble(30, value.getMileage());
pstmt.setInt(31, value.getIsAlarm());
pstmt.setString(32, value.getAlarmName());
pstmt.setString(33, value.getTerminalTime());
pstmt.setDouble(34, value.getMaxVoltageBattery());
pstmt.setDouble(35, value.getMinVoltageBattery());
pstmt.setDouble(36, value.getMaxTemperatureValue());
pstmt.setDouble(37, value.getMinTemperatureValue());
pstmt.setDouble(38, value.getSpeed());
pstmt.setInt(39, value.getSoc());
pstmt.setInt(40, value.getChargeFlag());
pstmt.setDouble(41, value.getTotalVoltage());
pstmt.setDouble(42, value.getTotalCurrent());
pstmt.setString(43, value.getBatteryVoltage());
pstmt.setString(44, value.getProbeTemperatures());
pstmt.setString(45, value.getSeriesName());
pstmt.setString(46, value.getModelName());
pstmt.setString(47, value.getLiveTime());
pstmt.setString(48, value.getSalesDate());
pstmt.setString(49, value.getCarType());
pstmt.setString(50, value.getProvince());
pstmt.setString(51, value.getCity());
pstmt.setString(52, value.getCountry());
//執行數據更新和遞交操作
pstmt.execute();
conn.commit();
} catch (SQLException e) {
conn.rollback();
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
super.close();
if(pstmt != null) pstmt.close();
if(conn != null) conn.close();
}
}