From 9b2654eaf1feb56543cf805b5f44a36b2907cff1 Mon Sep 17 00:00:00 2001 From: elkers <elkers@163.com> Date: 星期五, 18 四月 2025 10:09:05 +0800 Subject: [PATCH] 修改mqtt数据 --- src/main/java/com/nanjing/water/service/DataUploadRecordService.java | 234 +++++++++++++++++++++++++++++---------------------------- 1 files changed, 119 insertions(+), 115 deletions(-) diff --git a/src/main/java/com/nanjing/water/service/DataUploadRecordService.java b/src/main/java/com/nanjing/water/service/DataUploadRecordService.java index 3323e9d..71c95c4 100644 --- a/src/main/java/com/nanjing/water/service/DataUploadRecordService.java +++ b/src/main/java/com/nanjing/water/service/DataUploadRecordService.java @@ -296,128 +296,132 @@ ERROR_LOGGER.error(action, "消息不能为空"); return; } - //处理心跳 - Random random=new Random(); - int i=random.nextInt(100); - BigDecimal delayTime=BigDecimal.valueOf(1.0); - List<HeartbeatDataPO> list = heartbeatDataMapper.getList(); - if(Objects.nonNull(list)){ - HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null); - if(Objects.nonNull(heartbeatUp)){ - heartbeatUp.setDataValue(i); - heartbeatDataMapper.updateById(heartbeatUp); - } - HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null); - if(Objects.nonNull(alarmTime)){ - delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN); - } - } - //获取延时报警时间 - Gson gson = new Gson(); - JsonObject object = gson.fromJson(msg, JsonObject.class); - String asJsonObject = object.get("time").getAsString(); - Type listType = new TypeToken<List<CountVO>>() { - }.getType(); - //获取data数据 - JsonArray data = object.getAsJsonArray("Data"); - List<CountVO> listVo = gson.fromJson(data, listType); - for(CountVO vo:listVo){ - String[] split = vo.getName().split("-"); - vo.setName(split[0]); - vo.setCode(split[1]); - } - - List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList(); - List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList(); - //获取存入redis每天开始的启泵次数 - List<FacilityParameterDataVO> dataVOList=new ArrayList<>(); - String objects = redisTemplate.opsForValue().get("pump_start_number"); - if(StringUtil.isNotNullOrEmpty(objects)){ - dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class); - } - - // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList(); - //根据设备编号分组 - Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName)); - for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) { - //获取设备信息 - WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null); - //历史记录上报 - DataUploadRecordPO recordPO=new DataUploadRecordPO(); - recordPO.setFacilityId(facilityPO.getId()); - recordPO.setFacilityName(facilityPO.getFacilityName()); - String json = gson.toJson(entry.getValue()); - recordPO.setColumnsValue(json); - recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); - recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp())); - recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp()); - dataUploadRecordMapper.insert(recordPO); - //获取设备所属监控点 - List<WaterFacilityParameterPO>parameterList=new ArrayList<>(); - //循环上报数据 - for(CountVO vo:entry.getValue()){ - //保存数据 - WaterFacilityParameterPO parameterPO1 = parameterPOList.stream().filter(item -> item.getFacilityId().equals(facilityPO.getId()) && item.getColumnsCode().equals(vo.getCode())).findFirst().orElse(null); - if(Objects.nonNull(parameterPO1)){ - parameterPO1.setColumnValue(vo.getValue()); - parameterPO1.setLastTime(asJsonObject); - parameterList.add(parameterPO1); + String[] split1 = topic.split("/"); + if(split1[1].equals("data")){ + //处理心跳 + Random random=new Random(); + int i=random.nextInt(100); + BigDecimal delayTime=BigDecimal.valueOf(1.0); + List<HeartbeatDataPO> list = heartbeatDataMapper.getList(); + if(Objects.nonNull(list)){ + HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null); + if(Objects.nonNull(heartbeatUp)){ + heartbeatUp.setDataValue(i); + heartbeatDataMapper.updateById(heartbeatUp); } - //判断报警 - // AlarmHistoryPO alarmHistoryPO=null; + HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null); + if(Objects.nonNull(alarmTime)){ + delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN); + } + } + //获取延时报警时间 + Gson gson = new Gson(); + JsonObject object = gson.fromJson(msg, JsonObject.class); + String asJsonObject = object.get("time").getAsString(); + Type listType = new TypeToken<List<CountVO>>() { + }.getType(); + //获取data数据 + JsonArray data = object.getAsJsonArray("Data"); + List<CountVO> listVo = gson.fromJson(data, listType); + for(CountVO vo:listVo){ + String[] split = vo.getName().split("-"); + vo.setName(split[0]); + vo.setCode(split[1]); + } + + List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList(); + List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList(); + //获取存入redis每天开始的启泵次数 + List<FacilityParameterDataVO> dataVOList=new ArrayList<>(); + String objects = redisTemplate.opsForValue().get("pump_start_number"); + if(StringUtil.isNotNullOrEmpty(objects)){ + dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class); + } + + // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList(); + //根据设备编号分组 + Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName)); + for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) { + //获取设备信息 + WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null); + //历史记录上报 + DataUploadRecordPO recordPO=new DataUploadRecordPO(); + recordPO.setFacilityId(facilityPO.getId()); + recordPO.setFacilityName(facilityPO.getFacilityName()); + String json = gson.toJson(entry.getValue()); + recordPO.setColumnsValue(json); + recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); + recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp())); + recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp()); + dataUploadRecordMapper.insert(recordPO); + //获取设备所属监控点 + List<WaterFacilityParameterPO>parameterList=new ArrayList<>(); + //循环上报数据 + for(CountVO vo:entry.getValue()){ + //保存数据 + WaterFacilityParameterPO parameterPO1 = parameterPOList.stream().filter(item -> item.getFacilityId().equals(facilityPO.getId()) && item.getColumnsCode().equals(vo.getCode())).findFirst().orElse(null); + if(Objects.nonNull(parameterPO1)){ + parameterPO1.setColumnValue(vo.getValue()); + parameterPO1.setLastTime(asJsonObject); + parameterList.add(parameterPO1); + } + //判断报警 + // AlarmHistoryPO alarmHistoryPO=null; // if(ListUtil.isNotNullOrEmpty(historyPOList)){ // alarmHistoryPO = historyPOList.stream().filter(item -> item.getFacilityId().equals(parameterPO1.getFacilityId()) && item.getCode().equals(parameterPO1.getColumnsCode()) && item.getIsSolve() == 0).findFirst().orElse(null); // } - switch (vo.getCode()){ - case "QF1": - //1号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "QF2": - //2号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "QF3": - //3号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "QF4": - //4号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "B001Fault": - //1号泵综合故障 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "B002Fault": - //2号泵综合故障 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "SQ": - //水侵报警 - SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "H1001Fault": - //高水位报警 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "L1001Fault": - //低水位报警 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); - break; - case "B001StartNumber": - //1号泵启泵次数 - startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); - break; - case "B002StartNumber": - //2号泵启泵次数 - startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); - break; + switch (vo.getCode()){ + case "QF1": + //1号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "QF2": + //2号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "QF3": + //3号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "QF4": + //4号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "B001Fault": + //1号泵综合故障 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "B002Fault": + //2号泵综合故障 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "SQ": + //水侵报警 + SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "H1001Fault": + //高水位报警 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "L1001Fault": + //低水位报警 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "B001StartNumber": + //1号泵启泵次数 + startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); + break; + case "B002StartNumber": + //2号泵启泵次数 + startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); + break; + } } - } - waterFacilityParameterMapper.updateById(parameterList); + waterFacilityParameterMapper.updateById(parameterList); + } } + } public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { -- Gitblit v1.9.3