Flink流處理-Function之(flatmap, map, window)


flatmap

package pers.aishuang.flink.streaming.function.flatmap;

import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import pers.aishuang.flink.streaming.entity.VehicleInfoModel;
import pers.aishuang.flink.streaming.utils.DateUtil;

import java.util.HashMap;

public class VehicleInfoMapMysqlFunction extends RichCoFlatMapFunction<
        OnlineDataObj, HashMap<String,VehicleInfoModel>, OnlineDataObj> {
    //創建日志打印器
    private static final Logger logger = LoggerFactory.getLogger(VehicleInfoMapMysqlFunction.class);
    //定義廣播變量
    private static HashMap<String, VehicleInfoModel> vehicleInfoModelConfig = null;

    @Override
    public void flatMap1(OnlineDataObj value, Collector<OnlineDataObj> out) throws Exception {
        //1. 通過vin 獲取到車輛基礎信息
        String vin = value.getVin();
        VehicleInfoModel vehicleInfoModel = vehicleInfoModelConfig.getOrDefault(vin, null);

        //2. 如果車輛基礎信息不為空
        if(vehicleInfoModel != null) {
            //--將車系seriesName、車型modelName、車限liveTime、銷售日期saleDate,車輛類型carType封裝到onlineDataObj對象中
            value.setModelName(vehicleInfoModel.getModelName());
            value.setSeriesName(vehicleInfoModel.getSeriesName());
            value.setLiveTime(
                    (System.currentTimeMillis() - DateUtil.convertStringToDateTime(vehicleInfoModel.getSalesDate())
                            .getTime()) / 1000 / 3600 / 24 / 30 + ""
            );
            value.setSalesDate(vehicleInfoModel.getSalesDate());
            value.setCarType(vehicleInfoModel.getCarType());

            //-- 將onlineDataObj收集返回
            out.collect(value);
            //-- 打印輸出,基礎信息不存在
        }else {
            logger.error("當前上報車輛,未在庫里記錄");
        }
    }

    @Override
    public void flatMap2(HashMap<String, VehicleInfoModel> value, Collector<OnlineDataObj> out) throws Exception {
        vehicleInfoModelConfig = value;
    }
}

map

package pers.aishuang.flink.streaming.function.map;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.hadoop.hbase.util.Bytes;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.utils.GeoHashUtil;
import pers.aishuang.flink.streaming.utils.RedisUtil;
import pers.aishuang.flink.streaming.entity.VehicleLocationModel;

/**
 * 經緯度坐標只能表示一塊區域,實際生活中不存在點,因為點的大小可以無窮小
 * geoHash算法原理:
 * 將經度和緯度值轉化為2進制數字,以緯度在奇數位,經度在偶數位,將二者穿插合並成一個2進制數字
 * 再基於base32編碼將其轉成字符串,字符串越長,位置越精准
 */
public class LocactionInfoReidsFunction extends RichMapFunction<ItcastDataPartObj,ItcastDataPartObj> {
    @Override
    public ItcastDataPartObj map(ItcastDataPartObj value) throws Exception {
        //1. 獲取車輛數據的經度和緯度生成 geohash(經度,緯度)-> geohash字符串 -> 地理詳細位置
        Double lng = value.getLng();
        Double lat = value.getLat();
        String geohash = GeoHashUtil.encode(lat, lng);
        //2. 根據geohash 從redis中獲取value值(geohash在redis中是作為主鍵存在)
        byte[] locationDetailArr = RedisUtil.get(Bytes.toBytes(geohash));
        //3. 如果查詢出來的值不為空,將其通過JSON對象轉換成VehicleLocationModel對象,否則置為null
        if(locationDetailArr != null && locationDetailArr.length > 0) {
            String vehicleLocationJson = Bytes.toString(locationDetailArr);
            VehicleLocationModel model = JSON.parseObject(vehicleLocationJson,VehicleLocationModel.class);
            //4. 如果當前對象不為空,將國家,省市區地址賦給itcastDataPartObj,否則置為null
            if(model != null) {
                value.setCountry(model.getCountry());
                value.setProvince(model.getProvince());
                value.setCity(model.getCity());
                value.setDistrict(model.getDistrict());
                value.setAddress(model.getAddress());
            } else {
                value.setCountry(null);
                value.setProvince(null);
                value.setCity(null);
                value.setDistrict(null);
                value.setAddress(null);
            }
        }else {
            value.setCountry(null);
            value.setProvince(null);
            value.setCity(null);
            value.setDistrict(null);
            value.setAddress(null);
        }

        //5. 返回數據
        return value;
    }
}

window

package pers.aishuang.flink.streaming.function.window;

import com.clearspring.analytics.util.Lists;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.ElectricFenceModel;

import java.util.Collections;
import java.util.List;

/**
 * 考慮問題:時間滾動窗口,有的窗口沒有數據,flink是怎么處理的
 */

public class ElectricFenceWindowFunction extends RichWindowFunction<ElectricFenceModel, ElectricFenceModel,
                                                                    String , TimeWindow> {

    //創建logger
    private static final Logger logger = LoggerFactory.getLogger(ElectricFenceWindowFunction.class);
    //1.定義存儲歷史電子圍欄數據的state,<vin,是否在電子圍欄內0:內, 1:外> MapState<String, Integer>
    MapState<String, Byte> lastState = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //1. 定義mapState的描述器,就是定義state的結構
        MapStateDescriptor<String, Byte> lastStateDesc = new MapStateDescriptor<String, Byte>("lastState",String.class,Byte.class);
        //2. 獲取parameterTool,用來讀取配置文件參數
        StateTtlConfig stateTtlConfig = StateTtlConfig
                //設置狀態有效時間
                .newBuilder(Time.seconds(180))
                //設置狀態更新類型:更新
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                //設置已過期但還未被清理的狀態如何處理
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                //過期對象的清理策略
                .cleanupFullSnapshot()
                .build();
        //3. 設置state的ttl
        lastStateDesc.enableTimeToLive(stateTtlConfig);
        //4. 獲取當前運行環境 所描述的那個MapState
        lastState = getRuntimeContext().getMapState(lastStateDesc);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void apply(String key, TimeWindow window, Iterable<ElectricFenceModel> input, Collector<ElectricFenceModel> out) throws Exception {
        //1. 創建返回對象
        ElectricFenceModel electricFenceModel = new ElectricFenceModel();
        //2. 對窗口內的數據進行排序(按“終端時間戳”屬性進行升序排序)
        List<ElectricFenceModel> electricFenceModelList = Lists.newArrayList(input);
        Collections.sort(electricFenceModelList);
        Collections.reverse(electricFenceModelList);
        //3. 從state中獲取車輛vin對應的上一次窗口電子圍欄lastStateValue標記(車輛上一次窗口是否在電子圍欄中)0:電子圍欄內,1:電子圍欄外
        Byte lastStateValue = lastState.get(key);
        //4. 如果上次狀態為空,初始化賦值
        if(lastStateValue == null ){
            lastStateValue = -1;
        }
        //5. 判斷當前處於電子圍欄內還是電子圍欄外
        //-- 定義當前車輛電子圍欄圈內出現的次數
        long inElectricFence = electricFenceModelList.stream()
                .filter(model -> model.getNowStatus() == 0)
                .count();
        //-- 定義當前車輛電子圍欄圈外出現的次數
        long outElectricFence = electricFenceModelList.stream()
                .filter(model -> model.getNowStatus() == 1)
                .count();
        //6. 定義當前窗口的電子圍欄狀態,默認讓當前狀態為1,在電子圍欄外
        byte currentState = 1;
        //7. 90s內車輛出現在電子圍欄內的次數多於出現在電子圍欄外的次數,則認為當前處於電子圍欄內
        if(inElectricFence >= outElectricFence){
            currentState = 0;
        }
        //8. 將當前窗口的電子圍欄狀態寫入到state中,供下次判斷
        lastState.put(key, currentState);
        //9. 如果當前電子圍欄狀態與上一次電子圍欄狀態不同
        //-- 如果上一次窗口處於電子圍欄外,而本次是電子圍欄內,則將進入電子圍欄的時間寫入到數據庫中
        if(lastStateValue == 1 && currentState == 0) {
            //-- 進入柵欄,找到最后一條在外面的數據
            //-- 出去柵欄,找到最后一條在里面的數據
            ElectricFenceModel lastOutEleModel = electricFenceModelList
                    .stream()
                    .filter(model -> model.getNowStatus() == 1)
                    .findFirst()
                    .get();
            //-- 拷貝屬性給 electricFenceModel 並將進圍欄終端時間賦值,並且將狀態告警字段賦值為1  0:出圍欄 1:進圍欄,將數據collect返回
            BeanUtils.copyProperties(electricFenceModel, lastOutEleModel);
            electricFenceModel.setInEleTime(lastOutEleModel.getGpsTime());
            electricFenceModel.setStatusAlarm(1);
            out.collect(electricFenceModel);
        } else if(lastStateValue == 0 && currentState == 1) {
            //如果上一次窗口處於電子圍欄內,而本次是電子圍欄外
            //-- 進入柵欄,找到最后一條在外面的數據
            //-- 出去柵欄,找到最后一條在里面的數據
            ElectricFenceModel lastInEleModel = electricFenceModelList
                    .stream()
                    .filter(model -> model.getNowStatus() == 0)
                    .findFirst()
                    .get();
            //-- 拷貝屬性給electricFenceModel 並將出柵欄終端時間賦值,並且將狀態告警 0:出圍欄,1:進圍欄,將數據collect返回
            BeanUtils.copyProperties(electricFenceModel,lastInEleModel);
            electricFenceModel.setOutEleTime(lastInEleModel.getGpsTime());
            electricFenceModel.setStatusAlarm(0);
            out.collect(electricFenceModel);
        }

    }
}


package pers.aishuang.flink.streaming.function.window;


import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import pers.aishuang.flink.streaming.entity.ItcastDataPartObj;
import pers.aishuang.flink.streaming.entity.OnlineDataObj;
import pers.aishuang.flink.streaming.utils.DateUtil;

import java.util.ArrayList;
import java.util.Collections;


/**
 * 對30s窗口內的數據生成一個新的對象,包含拉寬數據地理信息,實時上報數據
 * 數據庫中的靜態數據 OnlineDataObj
 * 開發步驟:
 * 1、對數據進行排序
 * 2、獲取每一條數據
 * 3、對每條數據判斷是否有告警信息,alarm=1 說明是告警信息
 * 4、封裝成對象OnlineDataObj
 * 5、返回
 */
//http請求超時時間和 redis會話超時都要小於水印亂序時間
public class OnlineStatisticsWindowFunction extends RichWindowFunction<
                                                    ItcastDataPartObj, OnlineDataObj,
                                                    String, TimeWindow> {
    @Override
    public void apply(String key, TimeWindow window, Iterable<ItcastDataPartObj> input, Collector<OnlineDataObj> out) throws Exception {
        OnlineDataObj onlineDataObj = null;
        //1. 對當前的數據集合進行升序排序
        ArrayList<ItcastDataPartObj> itcastDataPartObjList = Lists.newArrayList(input);//guava的Lists工具類
        Collections.sort(itcastDataPartObjList);
        //第二種寫法:
        //Collections.sort(itcastDataPartObjList, Comparator.comparingLong(ItcastDataPartObj::getTerminalTimeStamp));
        //要想降序的話,直接再反轉就行:Collections.reverse(itcastDataPartObjList);
        //2. 獲取集合中第一條數據
        ItcastDataPartObj firstItcastData = itcastDataPartObjList.get(0);
        //3. 循環遍歷每條數據,將集合中存在異常的數據拼接到指定屬性中
        for(ItcastDataPartObj itcastDataPartObj : itcastDataPartObjList) {
            //30s窗口最多6條數據,每條數據需要檢測19個字段,如果出現異常字段就進行 字符串拼接
            //-- 過濾沒有各種警告的信息,調用setOnlineDataObj 將每一條對象和每條對象和標識0 返回到OnlineDataObj,並收集這個對象
            if(filterNoAlarm(itcastDataPartObj)){ //true:沒有告警:0
                onlineDataObj = setOnlineDataObj(firstItcastData, itcastDataPartObj, 0);
            }
            //否則 調用setOnlineDataObj 將第一條對象和每條對象和標識1 返回到OnlineDataObj,並收集這個對象
            else {
                onlineDataObj = setOnlineDataObj(firstItcastData,itcastDataPartObj,1);
            }
            out.collect(onlineDataObj);
        }

    }

    /**
     *
     * @param firstItcastData
     * @param itcastDataPartObj
     * @param alarmFlag
     * @return
     */
    private OnlineDataObj setOnlineDataObj(ItcastDataPartObj firstItcastData, ItcastDataPartObj itcastDataPartObj, int alarmFlag) {
        //1. 定義OnlineDataObj
        OnlineDataObj onlineDataObj = new OnlineDataObj();
        try {
            //2. 將每條的對象屬性拷貝到定義OnlineDataObj
            BeanUtils.copyProperties(onlineDataObj,itcastDataPartObj);
            //3. 將每條對象中表顯歷程賦值給mileage(已經有了,其實不用寫)
            onlineDataObj.setMileage(itcastDataPartObj.getTotalOdometer());
            //4. 將告警信號賦值給isAlarm
            onlineDataObj.setIsAlarm(alarmFlag);
            //5. 將每個對象通過addAlarmNameList生成告警list,拼接成字符串賦值給alarmName,通過字符串join
            onlineDataObj.setAlarmName(String.join("~",addAlarmNameList(itcastDataPartObj)));
            //6. 將窗口內第一條數據告警時間賦值給earliestTime
            onlineDataObj.setEarliestTime(firstItcastData.getTerminalTime());
            //7. 將獲取每條記錄的充電狀態通過getChargeState返回充電標識賦值給充電標記
            onlineDataObj.setChargeFlag(getChargeState(itcastDataPartObj.getChargeStatus()));
            //8. 將當前時間賦值給處理時間
            onlineDataObj.setProcessTime(DateUtil.getCurrentDateTime());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return onlineDataObj;
    }

    /**
     * 根據充電狀態返回充電標記
     * @param chargeState
     * @return
     */
    private Integer getChargeState(Integer chargeState) {
        int chargeFlag = -999999; //充電狀態的初始值
        //充電狀態:0x01:停車充電、0x02:行車充電
        if(chargeState == 1 || chargeState == 2 ){
            chargeFlag = 1;
        }
        //0x04:充電完成 0x03:未充電
        else if(chargeState == 4 || chargeState ==3){
            chargeFlag = 0;
        } else{
            chargeFlag = 2;
        }
        return chargeFlag;
    }

    /**
     * 將每條數據的故障名稱追加到故障名稱列表中
     * @param itcastDataPartObj
     * @return
     */
    private ArrayList<String> addAlarmNameList(ItcastDataPartObj itcastDataPartObj) {
        //定義故障名稱列表對象
        ArrayList<String> alarmNameList = new ArrayList<>();
        //電池高溫報警
        if(itcastDataPartObj.getBatteryAlarm() == 1) {
            alarmNameList.add("電池高溫報警");
        }
        //單體電池高壓報警
        if(itcastDataPartObj.getSingleBatteryOverVoltageAlarm() == 1) {
            alarmNameList.add("單體電池高壓報警");
        }
        //電池單體一致性差報警
        if(itcastDataPartObj.getBatteryConsistencyDifferenceAlarm() == 1) {
            alarmNameList.add("電池單體一致性差報警");
        }
        //絕緣報警
        if(itcastDataPartObj.getInsulationAlarm() == 1) {
            alarmNameList.add("絕緣報警");
        }
        //高壓互鎖狀態報警
        if(itcastDataPartObj.getHighVoltageInterlockStateAlarm() == 1) {
            alarmNameList.add("高壓互鎖狀態報警");
        }
        //SOC跳變報警
        if(itcastDataPartObj.getSocJumpAlarm() == 1) {
            alarmNameList.add("SOC跳變報警");
        }
        //驅動電機控制器溫度報警
        if(itcastDataPartObj.getDriveMotorControllerTemperatureAlarm() == 1) {
            alarmNameList.add("驅動電機控制器溫度報警");
        }
        //DC-DC溫度報警(dc-dc可以理解為車輛動力智能系統轉換器)
        if(itcastDataPartObj.getDcdcTemperatureAlarm() == 1) {
            alarmNameList.add("DC-DC溫度報警");
        }
        //SOC過高報警
        if(itcastDataPartObj.getSocHighAlarm() == 1) {
            alarmNameList.add("SOC過高報警");
        }
        //SOC低報警
        if(itcastDataPartObj.getSocLowAlarm() == 1) {
            alarmNameList.add("SOC低報警");
        }
        //溫度差異報警
        if(itcastDataPartObj.getTemperatureDifferenceAlarm() == 1) {
            alarmNameList.add("溫度差異報警");
        }
        //車載儲能裝置欠壓報警
        if(itcastDataPartObj.getVehicleStorageDeviceUndervoltageAlarm() == 1) {
            alarmNameList.add("車載儲能裝置欠壓報警");
        }
        //DC-DC狀態報警
        if(itcastDataPartObj.getDcdcStatusAlarm() == 1) {
            alarmNameList.add("DC-DC狀態報警");
        }
        //單體電池欠壓報警
        if(itcastDataPartObj.getSingleBatteryUnderVoltageAlarm() == 1) {
            alarmNameList.add("單體電池欠壓報警");
        }
        //可充電儲能系統不匹配報警
        if(itcastDataPartObj.getRechargeableStorageDeviceMismatchAlarm() == 1) {
            alarmNameList.add("可充電儲能系統不匹配報警");
        }
        //車載儲能裝置過壓報警
        if(itcastDataPartObj.getVehicleStorageDeviceOvervoltageAlarm() == 1) {
            alarmNameList.add("車載儲能裝置過壓報警");
        }
        //制動系統報警
        if(itcastDataPartObj.getBrakeSystemAlarm() == 1) {
            alarmNameList.add("制動系統報警");
        }
        //驅動電機溫度報警
        if(itcastDataPartObj.getDriveMotorTemperatureAlarm() == 1) {
            alarmNameList.add("驅動電機溫度報警");
        }
        //車載儲能裝置類型過充報警
        if(itcastDataPartObj.getVehiclePureDeviceTypeOvercharge() == 1) {
            alarmNameList.add("車載儲能裝置類型過充報警");
        }
        return alarmNameList;
    }

    /**
     * 判斷是否存在報警的字段
     * @param itcastDataPartObj
     * @return
     */
    private boolean filterNoAlarm(ItcastDataPartObj itcastDataPartObj) {
       //正常:0 , 異常:1
        if(
            //電池高溫報警
            itcastDataPartObj.getBatteryAlarm() == 1 ||
            //單體電池高壓報警
            itcastDataPartObj.getSingleBatteryOverVoltageAlarm() == 1 ||
            //電池單體一致性差報警
            itcastDataPartObj.getBatteryConsistencyDifferenceAlarm() == 1 ||
            //絕緣報警
            itcastDataPartObj.getInsulationAlarm() == 1 ||
            //高壓互鎖狀態報警
            itcastDataPartObj.getHighVoltageInterlockStateAlarm() == 1 ||
            //SOC跳變報警
            itcastDataPartObj.getSocJumpAlarm() == 1 ||
            //驅動電機控制器溫度報警
            itcastDataPartObj.getDriveMotorControllerTemperatureAlarm() == 1 ||
            //DC-DC溫度報警(dc-dc可以理解為車輛動力智能系統轉換器)
            itcastDataPartObj.getDcdcTemperatureAlarm() ==1 ||
            //SOC過高報警
            itcastDataPartObj.getSocHighAlarm() == 1||
            //SOC低報警
            itcastDataPartObj.getSocLowAlarm() == 1 ||
            //溫度差異報警
            itcastDataPartObj.getTemperatureDifferenceAlarm() == 1||
            //車載儲能裝置欠壓報警
            itcastDataPartObj.getVehicleStorageDeviceUndervoltageAlarm() == 1||
            //DC-DC狀態報警
            itcastDataPartObj.getDcdcStatusAlarm() == 1||
            //單體電池欠壓報警
            itcastDataPartObj.getSingleBatteryUnderVoltageAlarm() == 1||
            //可充電儲能系統不匹配報警
            itcastDataPartObj.getRechargeableStorageDeviceMismatchAlarm() == 1||
            //車載儲能裝置過壓報警
            itcastDataPartObj.getVehicleStorageDeviceOvervoltageAlarm() == 1||
            //制動系統報警
            itcastDataPartObj.getBrakeSystemAlarm() == 1 ||
            //驅動電機溫度報警
            itcastDataPartObj.getDriveMotorTemperatureAlarm() == 1 ||
            //車載儲能裝置類型過充報警
            itcastDataPartObj.getVehiclePureDeviceTypeOvercharge() == 1
        ){
            return false;
        }else {
            return true;
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM