elkers
2 天以前 9b2654eaf1feb56543cf805b5f44a36b2907cff1
修改mqtt数据
已修改1个文件
234 ■■■■ 文件已修改
src/main/java/com/nanjing/water/service/DataUploadRecordService.java 234 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/nanjing/water/service/DataUploadRecordService.java
@@ -296,128 +296,132 @@
            ERROR_LOGGER.error(action, "消息不能为空");
            return;
        }
        //处理心跳
        Random random=new Random();
        int i=random.nextInt(100);
        BigDecimal delayTime=BigDecimal.valueOf(1.0);
        List<HeartbeatDataPO> list = heartbeatDataMapper.getList();
        if(Objects.nonNull(list)){
            HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null);
            if(Objects.nonNull(heartbeatUp)){
                heartbeatUp.setDataValue(i);
                heartbeatDataMapper.updateById(heartbeatUp);
            }
            HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null);
           if(Objects.nonNull(alarmTime)){
               delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN);
           }
        }
        //获取延时报警时间
        Gson gson = new Gson();
        JsonObject object = gson.fromJson(msg, JsonObject.class);
        String asJsonObject = object.get("time").getAsString();
        Type listType = new TypeToken<List<CountVO>>() {
        }.getType();
        //获取data数据
        JsonArray data = object.getAsJsonArray("Data");
        List<CountVO> listVo = gson.fromJson(data, listType);
        for(CountVO vo:listVo){
            String[] split = vo.getName().split("-");
            vo.setName(split[0]);
            vo.setCode(split[1]);
        }
        List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList();
        List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList();
        //获取存入redis每天开始的启泵次数
        List<FacilityParameterDataVO> dataVOList=new ArrayList<>();
        String objects = redisTemplate.opsForValue().get("pump_start_number");
        if(StringUtil.isNotNullOrEmpty(objects)){
                dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class);
        }
       // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList();
        //根据设备编号分组
        Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName));
        for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) {
            //获取设备信息
            WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null);
            //历史记录上报
            DataUploadRecordPO recordPO=new DataUploadRecordPO();
            recordPO.setFacilityId(facilityPO.getId());
            recordPO.setFacilityName(facilityPO.getFacilityName());
            String json = gson.toJson(entry.getValue());
            recordPO.setColumnsValue(json);
            recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
            recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp()));
            recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp());
            dataUploadRecordMapper.insert(recordPO);
            //获取设备所属监控点
            List<WaterFacilityParameterPO>parameterList=new ArrayList<>();
            //循环上报数据
            for(CountVO vo:entry.getValue()){
                //保存数据
                WaterFacilityParameterPO parameterPO1 = parameterPOList.stream().filter(item -> item.getFacilityId().equals(facilityPO.getId()) && item.getColumnsCode().equals(vo.getCode())).findFirst().orElse(null);
                if(Objects.nonNull(parameterPO1)){
                    parameterPO1.setColumnValue(vo.getValue());
                    parameterPO1.setLastTime(asJsonObject);
                    parameterList.add(parameterPO1);
        String[] split1 = topic.split("/");
        if(split1[1].equals("data")){
            //处理心跳
            Random random=new Random();
            int i=random.nextInt(100);
            BigDecimal delayTime=BigDecimal.valueOf(1.0);
            List<HeartbeatDataPO> list = heartbeatDataMapper.getList();
            if(Objects.nonNull(list)){
                HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null);
                if(Objects.nonNull(heartbeatUp)){
                    heartbeatUp.setDataValue(i);
                    heartbeatDataMapper.updateById(heartbeatUp);
                }
                //判断报警
              //  AlarmHistoryPO alarmHistoryPO=null;
                HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null);
                if(Objects.nonNull(alarmTime)){
                    delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN);
                }
            }
            //获取延时报警时间
            Gson gson = new Gson();
            JsonObject object = gson.fromJson(msg, JsonObject.class);
            String asJsonObject = object.get("time").getAsString();
            Type listType = new TypeToken<List<CountVO>>() {
            }.getType();
            //获取data数据
            JsonArray data = object.getAsJsonArray("Data");
            List<CountVO> listVo = gson.fromJson(data, listType);
            for(CountVO vo:listVo){
                String[] split = vo.getName().split("-");
                vo.setName(split[0]);
                vo.setCode(split[1]);
            }
            List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList();
            List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList();
            //获取存入redis每天开始的启泵次数
            List<FacilityParameterDataVO> dataVOList=new ArrayList<>();
            String objects = redisTemplate.opsForValue().get("pump_start_number");
            if(StringUtil.isNotNullOrEmpty(objects)){
                dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class);
            }
            // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList();
            //根据设备编号分组
            Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName));
            for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) {
                //获取设备信息
                WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null);
                //历史记录上报
                DataUploadRecordPO recordPO=new DataUploadRecordPO();
                recordPO.setFacilityId(facilityPO.getId());
                recordPO.setFacilityName(facilityPO.getFacilityName());
                String json = gson.toJson(entry.getValue());
                recordPO.setColumnsValue(json);
                recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
                recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp()));
                recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp());
                dataUploadRecordMapper.insert(recordPO);
                //获取设备所属监控点
                List<WaterFacilityParameterPO>parameterList=new ArrayList<>();
                //循环上报数据
                for(CountVO vo:entry.getValue()){
                    //保存数据
                    WaterFacilityParameterPO parameterPO1 = parameterPOList.stream().filter(item -> item.getFacilityId().equals(facilityPO.getId()) && item.getColumnsCode().equals(vo.getCode())).findFirst().orElse(null);
                    if(Objects.nonNull(parameterPO1)){
                        parameterPO1.setColumnValue(vo.getValue());
                        parameterPO1.setLastTime(asJsonObject);
                        parameterList.add(parameterPO1);
                    }
                    //判断报警
                    //  AlarmHistoryPO alarmHistoryPO=null;
//                if(ListUtil.isNotNullOrEmpty(historyPOList)){
//                 alarmHistoryPO = historyPOList.stream().filter(item -> item.getFacilityId().equals(parameterPO1.getFacilityId()) && item.getCode().equals(parameterPO1.getColumnsCode()) && item.getIsSolve() == 0).findFirst().orElse(null);
//                }
                switch (vo.getCode()){
                    case "QF1":
                        //1号空开吸合关闭报警
                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "QF2":
                        //2号空开吸合关闭报警
                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "QF3":
                        //3号空开吸合关闭报警
                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "QF4":
                        //4号空开吸合关闭报警
                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "B001Fault":
                        //1号泵综合故障
                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "B002Fault":
                        //2号泵综合故障
                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "SQ":
                        //水侵报警
                        SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "H1001Fault":
                        //高水位报警
                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "L1001Fault":
                        //低水位报警
                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                        break;
                    case "B001StartNumber":
                        //1号泵启泵次数
                        startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList);
                        break;
                    case "B002StartNumber":
                        //2号泵启泵次数
                        startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList);
                        break;
                    switch (vo.getCode()){
                        case "QF1":
                            //1号空开吸合关闭报警
                            QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "QF2":
                            //2号空开吸合关闭报警
                            QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "QF3":
                            //3号空开吸合关闭报警
                            QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "QF4":
                            //4号空开吸合关闭报警
                            QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "B001Fault":
                            //1号泵综合故障
                            faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "B002Fault":
                            //2号泵综合故障
                            faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "SQ":
                            //水侵报警
                            SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "H1001Fault":
                            //高水位报警
                            faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "L1001Fault":
                            //低水位报警
                            faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                            break;
                        case "B001StartNumber":
                            //1号泵启泵次数
                            startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList);
                            break;
                        case "B002StartNumber":
                            //2号泵启泵次数
                            startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList);
                            break;
                    }
                }
            }
            waterFacilityParameterMapper.updateById(parameterList);
                waterFacilityParameterMapper.updateById(parameterList);
            }
        }
    }
     public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException {