flatmap
package pers.aishuang.flink.streaming.function.flatmap;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;
import pers.aishuang.flink.streaming.utils.DateUtil;
import java.util.HashMap;
public class VehicleInfoMapMysqlFunction extends RichCoFlatMapFunction<
OnlineDataObj, HashMap<String,VehicleInfoModel>, OnlineDataObj> {
//創建日志打印器
private static final Logger logger = LoggerFactory.getLogger(VehicleInfoMapMysqlFunction.class);
//定義廣播變量
private static HashMap<String, VehicleInfoModel> vehicleInfoModelConfig = null;
@Override
public void flatMap1(OnlineDataObj value, Collector<OnlineDataObj> out) throws Exception {
//1. 通過vin 獲取到車輛基礎信息
String vin = value.getVin();
VehicleInfoModel vehicleInfoModel = vehicleInfoModelConfig.getOrDefault(vin, null);
//2. 如果車輛基礎信息不為空
if(vehicleInfoModel != null) {
//--將車系seriesName、車型modelName、車限liveTime、銷售日期saleDate,車輛類型carType封裝到onlineDataObj對象中
value.setModelName(vehicleInfoModel.getModelName());
value.setSeriesName(vehicleInfoModel.getSeriesName());
value.setLiveTime(
(System.currentTimeMillis() - DateUtil.convertStringToDateTime(vehicleInfoModel.getSalesDate())
.getTime()) / 1000 / 3600 / 24 / 30 + ""
);
value.setSalesDate(vehicleInfoModel.getSalesDate());
value.setCarType(vehicleInfoModel.getCarType());
//-- 將onlineDataObj收集返回
out.collect(value);
//-- 打印輸出,基礎信息不存在
}else {
logger.error("當前上報車輛,未在庫里記錄");
}
}
@Override
public void flatMap2(HashMap<String, VehicleInfoModel> value, Collector<OnlineDataObj> out) throws Exception {
vehicleInfoModelConfig = value;
}
}
map
package pers.aishuang.flink.streaming.function.map;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.hadoop.hbase.util.Bytes;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.utils.GeoHashUtil;
import pers.aishuang.flink.streaming.utils.RedisUtil;
import pers.aishuang.flink.streaming.entity.VehicleLocationModel;
/**
* 經緯度坐標只能表示一塊區域,實際生活中不存在點,因為點的大小可以無窮小
* geoHash算法原理:
* 將經度和緯度值轉化為2進制數字,以緯度在奇數位,經度在偶數位,將二者穿插合並成一個2進制數字
* 再基於base32編碼將其轉成字符串,字符串越長,位置越精准
*/
public class LocactionInfoReidsFunction extends RichMapFunction<ItcastDataPartObj,ItcastDataPartObj> {
@Override
public ItcastDataPartObj map(ItcastDataPartObj value) throws Exception {
//1. 獲取車輛數據的經度和緯度生成 geohash(經度,緯度)-> geohash字符串 -> 地理詳細位置
Double lng = value.getLng();
Double lat = value.getLat();
String geohash = GeoHashUtil.encode(lat, lng);
//2. 根據geohash 從redis中獲取value值(geohash在redis中是作為主鍵存在)
byte[] locationDetailArr = RedisUtil.get(Bytes.toBytes(geohash));
//3. 如果查詢出來的值不為空,將其通過JSON對象轉換成VehicleLocationModel對象,否則置為null
if(locationDetailArr != null && locationDetailArr.length > 0) {
String vehicleLocationJson = Bytes.toString(locationDetailArr);
VehicleLocationModel model = JSON.parseObject(vehicleLocationJson,VehicleLocationModel.class);
//4. 如果當前對象不為空,將國家,省市區地址賦給itcastDataPartObj,否則置為null
if(model != null) {
value.setCountry(model.getCountry());
value.setProvince(model.getProvince());
value.setCity(model.getCity());
value.setDistrict(model.getDistrict());
value.setAddress(model.getAddress());
} else {
value.setCountry(null);
value.setProvince(null);
value.setCity(null);
value.setDistrict(null);
value.setAddress(null);
}
}else {
value.setCountry(null);
value.setProvince(null);
value.setCity(null);
value.setDistrict(null);
value.setAddress(null);
}
//5. 返回數據
return value;
}
}
window
package pers.aishuang.flink.streaming.function.window;
import com.clearspring.analytics.util.Lists;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;
import java.util.Collections;
import java.util.List;
/**
* 考慮問題:時間滾動窗口,有的窗口沒有數據,flink是怎么處理的
*/
public class ElectricFenceWindowFunction extends RichWindowFunction<ElectricFenceModel, ElectricFenceModel,
String , TimeWindow> {
//創建logger
private static final Logger logger = LoggerFactory.getLogger(ElectricFenceWindowFunction.class);
//1.定義存儲歷史電子圍欄數據的state,<vin,是否在電子圍欄內0:內, 1:外> MapState<String, Integer>
MapState<String, Byte> lastState = null;
@Override
public void open(Configuration parameters) throws Exception {
//1. 定義mapState的描述器,就是定義state的結構
MapStateDescriptor<String, Byte> lastStateDesc = new MapStateDescriptor<String, Byte>("lastState",String.class,Byte.class);
//2. 獲取parameterTool,用來讀取配置文件參數
StateTtlConfig stateTtlConfig = StateTtlConfig
//設置狀態有效時間
.newBuilder(Time.seconds(180))
//設置狀態更新類型:更新
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//設置已過期但還未被清理的狀態如何處理
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//過期對象的清理策略
.cleanupFullSnapshot()
.build();
//3. 設置state的ttl
lastStateDesc.enableTimeToLive(stateTtlConfig);
//4. 獲取當前運行環境 所描述的那個MapState
lastState = getRuntimeContext().getMapState(lastStateDesc);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void apply(String key, TimeWindow window, Iterable<ElectricFenceModel> input, Collector<ElectricFenceModel> out) throws Exception {
//1. 創建返回對象
ElectricFenceModel electricFenceModel = new ElectricFenceModel();
//2. 對窗口內的數據進行排序(按“終端時間戳”屬性進行升序排序)
List<ElectricFenceModel> electricFenceModelList = Lists.newArrayList(input);
Collections.sort(electricFenceModelList);
Collections.reverse(electricFenceModelList);
//3. 從state中獲取車輛vin對應的上一次窗口電子圍欄lastStateValue標記(車輛上一次窗口是否在電子圍欄中)0:電子圍欄內,1:電子圍欄外
Byte lastStateValue = lastState.get(key);
//4. 如果上次狀態為空,初始化賦值
if(lastStateValue == null ){
lastStateValue = -1;
}
//5. 判斷當前處於電子圍欄內還是電子圍欄外
//-- 定義當前車輛電子圍欄圈內出現的次數
long inElectricFence = electricFenceModelList.stream()
.filter(model -> model.getNowStatus() == 0)
.count();
//-- 定義當前車輛電子圍欄圈外出現的次數
long outElectricFence = electricFenceModelList.stream()
.filter(model -> model.getNowStatus() == 1)
.count();
//6. 定義當前窗口的電子圍欄狀態,默認讓當前狀態為1,在電子圍欄外
byte currentState = 1;
//7. 90s內車輛出現在電子圍欄內的次數多於出現在電子圍欄外的次數,則認為當前處於電子圍欄內
if(inElectricFence >= outElectricFence){
currentState = 0;
}
//8. 將當前窗口的電子圍欄狀態寫入到state中,供下次判斷
lastState.put(key, currentState);
//9. 如果當前電子圍欄狀態與上一次電子圍欄狀態不同
//-- 如果上一次窗口處於電子圍欄外,而本次是電子圍欄內,則將進入電子圍欄的時間寫入到數據庫中
if(lastStateValue == 1 && currentState == 0) {
//-- 進入柵欄,找到最后一條在外面的數據
//-- 出去柵欄,找到最后一條在里面的數據
ElectricFenceModel lastOutEleModel = electricFenceModelList
.stream()
.filter(model -> model.getNowStatus() == 1)
.findFirst()
.get();
//-- 拷貝屬性給 electricFenceModel 並將進圍欄終端時間賦值,並且將狀態告警字段賦值為1 0:出圍欄 1:進圍欄,將數據collect返回
BeanUtils.copyProperties(electricFenceModel, lastOutEleModel);
electricFenceModel.setInEleTime(lastOutEleModel.getGpsTime());
electricFenceModel.setStatusAlarm(1);
out.collect(electricFenceModel);
} else if(lastStateValue == 0 && currentState == 1) {
//如果上一次窗口處於電子圍欄內,而本次是電子圍欄外
//-- 進入柵欄,找到最后一條在外面的數據
//-- 出去柵欄,找到最后一條在里面的數據
ElectricFenceModel lastInEleModel = electricFenceModelList
.stream()
.filter(model -> model.getNowStatus() == 0)
.findFirst()
.get();
//-- 拷貝屬性給electricFenceModel 並將出柵欄終端時間賦值,並且將狀態告警 0:出圍欄,1:進圍欄,將數據collect返回
BeanUtils.copyProperties(electricFenceModel,lastInEleModel);
electricFenceModel.setOutEleTime(lastInEleModel.getGpsTime());
electricFenceModel.setStatusAlarm(0);
out.collect(electricFenceModel);
}
}
}
package pers.aishuang.flink.streaming.function.window;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import pers.aishuang.flink.streaming.utils.DateUtil;
import java.util.ArrayList;
import java.util.Collections;
/**
* 對30s窗口內的數據生成一個新的對象,包含拉寬數據地理信息,實時上報數據
* 數據庫中的靜態數據 OnlineDataObj
* 開發步驟:
* 1、對數據進行排序
* 2、獲取每一條數據
* 3、對每條數據判斷是否有告警信息,alarm=1 說明是告警信息
* 4、封裝成對象OnlineDataObj
* 5、返回
*/
//http請求超時時間和 redis會話超時都要小於水印亂序時間
public class OnlineStatisticsWindowFunction extends RichWindowFunction<
ItcastDataPartObj, OnlineDataObj,
String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<ItcastDataPartObj> input, Collector<OnlineDataObj> out) throws Exception {
OnlineDataObj onlineDataObj = null;
//1. 對當前的數據集合進行升序排序
ArrayList<ItcastDataPartObj> itcastDataPartObjList = Lists.newArrayList(input);//guava的Lists工具類
Collections.sort(itcastDataPartObjList);
//第二種寫法:
//Collections.sort(itcastDataPartObjList, Comparator.comparingLong(ItcastDataPartObj::getTerminalTimeStamp));
//要想降序的話,直接再反轉就行:Collections.reverse(itcastDataPartObjList);
//2. 獲取集合中第一條數據
ItcastDataPartObj firstItcastData = itcastDataPartObjList.get(0);
//3. 循環遍歷每條數據,將集合中存在異常的數據拼接到指定屬性中
for(ItcastDataPartObj itcastDataPartObj : itcastDataPartObjList) {
//30s窗口最多6條數據,每條數據需要檢測19個字段,如果出現異常字段就進行 字符串拼接
//-- 過濾沒有各種警告的信息,調用setOnlineDataObj 將每一條對象和每條對象和標識0 返回到OnlineDataObj,並收集這個對象
if(filterNoAlarm(itcastDataPartObj)){ //true:沒有告警:0
onlineDataObj = setOnlineDataObj(firstItcastData, itcastDataPartObj, 0);
}
//否則 調用setOnlineDataObj 將第一條對象和每條對象和標識1 返回到OnlineDataObj,並收集這個對象
else {
onlineDataObj = setOnlineDataObj(firstItcastData,itcastDataPartObj,1);
}
out.collect(onlineDataObj);
}
}
/**
*
* @param firstItcastData
* @param itcastDataPartObj
* @param alarmFlag
* @return
*/
private OnlineDataObj setOnlineDataObj(ItcastDataPartObj firstItcastData, ItcastDataPartObj itcastDataPartObj, int alarmFlag) {
//1. 定義OnlineDataObj
OnlineDataObj onlineDataObj = new OnlineDataObj();
try {
//2. 將每條的對象屬性拷貝到定義OnlineDataObj
BeanUtils.copyProperties(onlineDataObj,itcastDataPartObj);
//3. 將每條對象中表顯歷程賦值給mileage(已經有了,其實不用寫)
onlineDataObj.setMileage(itcastDataPartObj.getTotalOdometer());
//4. 將告警信號賦值給isAlarm
onlineDataObj.setIsAlarm(alarmFlag);
//5. 將每個對象通過addAlarmNameList生成告警list,拼接成字符串賦值給alarmName,通過字符串join
onlineDataObj.setAlarmName(String.join("~",addAlarmNameList(itcastDataPartObj)));
//6. 將窗口內第一條數據告警時間賦值給earliestTime
onlineDataObj.setEarliestTime(firstItcastData.getTerminalTime());
//7. 將獲取每條記錄的充電狀態通過getChargeState返回充電標識賦值給充電標記
onlineDataObj.setChargeFlag(getChargeState(itcastDataPartObj.getChargeStatus()));
//8. 將當前時間賦值給處理時間
onlineDataObj.setProcessTime(DateUtil.getCurrentDateTime());
} catch (Exception e) {
e.printStackTrace();
}
return onlineDataObj;
}
/**
* 根據充電狀態返回充電標記
* @param chargeState
* @return
*/
private Integer getChargeState(Integer chargeState) {
int chargeFlag = -999999; //充電狀態的初始值
//充電狀態:0x01:停車充電、0x02:行車充電
if(chargeState == 1 || chargeState == 2 ){
chargeFlag = 1;
}
//0x04:充電完成 0x03:未充電
else if(chargeState == 4 || chargeState ==3){
chargeFlag = 0;
} else{
chargeFlag = 2;
}
return chargeFlag;
}
/**
* 將每條數據的故障名稱追加到故障名稱列表中
* @param itcastDataPartObj
* @return
*/
private ArrayList<String> addAlarmNameList(ItcastDataPartObj itcastDataPartObj) {
//定義故障名稱列表對象
ArrayList<String> alarmNameList = new ArrayList<>();
//電池高溫報警
if(itcastDataPartObj.getBatteryAlarm() == 1) {
alarmNameList.add("電池高溫報警");
}
//單體電池高壓報警
if(itcastDataPartObj.getSingleBatteryOverVoltageAlarm() == 1) {
alarmNameList.add("單體電池高壓報警");
}
//電池單體一致性差報警
if(itcastDataPartObj.getBatteryConsistencyDifferenceAlarm() == 1) {
alarmNameList.add("電池單體一致性差報警");
}
//絕緣報警
if(itcastDataPartObj.getInsulationAlarm() == 1) {
alarmNameList.add("絕緣報警");
}
//高壓互鎖狀態報警
if(itcastDataPartObj.getHighVoltageInterlockStateAlarm() == 1) {
alarmNameList.add("高壓互鎖狀態報警");
}
//SOC跳變報警
if(itcastDataPartObj.getSocJumpAlarm() == 1) {
alarmNameList.add("SOC跳變報警");
}
//驅動電機控制器溫度報警
if(itcastDataPartObj.getDriveMotorControllerTemperatureAlarm() == 1) {
alarmNameList.add("驅動電機控制器溫度報警");
}
//DC-DC溫度報警(dc-dc可以理解為車輛動力智能系統轉換器)
if(itcastDataPartObj.getDcdcTemperatureAlarm() == 1) {
alarmNameList.add("DC-DC溫度報警");
}
//SOC過高報警
if(itcastDataPartObj.getSocHighAlarm() == 1) {
alarmNameList.add("SOC過高報警");
}
//SOC低報警
if(itcastDataPartObj.getSocLowAlarm() == 1) {
alarmNameList.add("SOC低報警");
}
//溫度差異報警
if(itcastDataPartObj.getTemperatureDifferenceAlarm() == 1) {
alarmNameList.add("溫度差異報警");
}
//車載儲能裝置欠壓報警
if(itcastDataPartObj.getVehicleStorageDeviceUndervoltageAlarm() == 1) {
alarmNameList.add("車載儲能裝置欠壓報警");
}
//DC-DC狀態報警
if(itcastDataPartObj.getDcdcStatusAlarm() == 1) {
alarmNameList.add("DC-DC狀態報警");
}
//單體電池欠壓報警
if(itcastDataPartObj.getSingleBatteryUnderVoltageAlarm() == 1) {
alarmNameList.add("單體電池欠壓報警");
}
//可充電儲能系統不匹配報警
if(itcastDataPartObj.getRechargeableStorageDeviceMismatchAlarm() == 1) {
alarmNameList.add("可充電儲能系統不匹配報警");
}
//車載儲能裝置過壓報警
if(itcastDataPartObj.getVehicleStorageDeviceOvervoltageAlarm() == 1) {
alarmNameList.add("車載儲能裝置過壓報警");
}
//制動系統報警
if(itcastDataPartObj.getBrakeSystemAlarm() == 1) {
alarmNameList.add("制動系統報警");
}
//驅動電機溫度報警
if(itcastDataPartObj.getDriveMotorTemperatureAlarm() == 1) {
alarmNameList.add("驅動電機溫度報警");
}
//車載儲能裝置類型過充報警
if(itcastDataPartObj.getVehiclePureDeviceTypeOvercharge() == 1) {
alarmNameList.add("車載儲能裝置類型過充報警");
}
return alarmNameList;
}
/**
* 判斷是否存在報警的字段
* @param itcastDataPartObj
* @return
*/
private boolean filterNoAlarm(ItcastDataPartObj itcastDataPartObj) {
//正常:0 , 異常:1
if(
//電池高溫報警
itcastDataPartObj.getBatteryAlarm() == 1 ||
//單體電池高壓報警
itcastDataPartObj.getSingleBatteryOverVoltageAlarm() == 1 ||
//電池單體一致性差報警
itcastDataPartObj.getBatteryConsistencyDifferenceAlarm() == 1 ||
//絕緣報警
itcastDataPartObj.getInsulationAlarm() == 1 ||
//高壓互鎖狀態報警
itcastDataPartObj.getHighVoltageInterlockStateAlarm() == 1 ||
//SOC跳變報警
itcastDataPartObj.getSocJumpAlarm() == 1 ||
//驅動電機控制器溫度報警
itcastDataPartObj.getDriveMotorControllerTemperatureAlarm() == 1 ||
//DC-DC溫度報警(dc-dc可以理解為車輛動力智能系統轉換器)
itcastDataPartObj.getDcdcTemperatureAlarm() ==1 ||
//SOC過高報警
itcastDataPartObj.getSocHighAlarm() == 1||
//SOC低報警
itcastDataPartObj.getSocLowAlarm() == 1 ||
//溫度差異報警
itcastDataPartObj.getTemperatureDifferenceAlarm() == 1||
//車載儲能裝置欠壓報警
itcastDataPartObj.getVehicleStorageDeviceUndervoltageAlarm() == 1||
//DC-DC狀態報警
itcastDataPartObj.getDcdcStatusAlarm() == 1||
//單體電池欠壓報警
itcastDataPartObj.getSingleBatteryUnderVoltageAlarm() == 1||
//可充電儲能系統不匹配報警
itcastDataPartObj.getRechargeableStorageDeviceMismatchAlarm() == 1||
//車載儲能裝置過壓報警
itcastDataPartObj.getVehicleStorageDeviceOvervoltageAlarm() == 1||
//制動系統報警
itcastDataPartObj.getBrakeSystemAlarm() == 1 ||
//驅動電機溫度報警
itcastDataPartObj.getDriveMotorTemperatureAlarm() == 1 ||
//車載儲能裝置類型過充報警
itcastDataPartObj.getVehiclePureDeviceTypeOvercharge() == 1
){
return false;
}else {
return true;
}
}
}