From 9b2654eaf1feb56543cf805b5f44a36b2907cff1 Mon Sep 17 00:00:00 2001 From: elkers <elkers@163.com> Date: 星期五, 18 四月 2025 10:09:05 +0800 Subject: [PATCH] 修改mqtt数据 --- src/main/java/com/nanjing/water/service/DataUploadRecordService.java | 255 +++++++++++++++++++++++++++++++++++--------------- 1 files changed, 178 insertions(+), 77 deletions(-) diff --git a/src/main/java/com/nanjing/water/service/DataUploadRecordService.java b/src/main/java/com/nanjing/water/service/DataUploadRecordService.java index 7e5e689..71c95c4 100644 --- a/src/main/java/com/nanjing/water/service/DataUploadRecordService.java +++ b/src/main/java/com/nanjing/water/service/DataUploadRecordService.java @@ -23,6 +23,7 @@ package com.nanjing.water.service; import com.baomidou.mybatisplus.core.toolkit.Sequence; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; @@ -32,14 +33,21 @@ import com.nanjing.water.common.model.Tuple; import com.nanjing.water.common.util.*; import com.nanjing.water.host.mqtt.CountVO; +import com.nanjing.water.host.mqtt.MQTTServer; import com.nanjing.water.repository.impl.*; import com.nanjing.water.repository.po.*; +import com.nanjing.water.repository.vo.FacilityParameterDataVO; import org.apache.commons.lang3.BooleanUtils; +import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -68,6 +76,16 @@ private WaterMonitoryPointMapperImpl waterMonitoryPointMapper; @Autowired private AlarmHistoryMapperImpl alarmHistoryMapper; + @Autowired + private HeartbeatDataMapperImpl heartbeatDataMapper; + @Autowired + private DataUploadRecordMapperImpl dataUploadRecordMapper; + @Autowired + private MQTTServer mqttServer; + @Autowired + private StringRedisTemplate redisTemplate; + @Autowired + private ObjectMapper objectMapper; public ExecutedResult<Long> create(ReqCreateDataUploadRecord request) { // 转换po @@ -268,7 +286,7 @@ * @param topic 主题 * @param msg 消息内容 */ - public void mqttReceived(String topic, String msg) { + public void mqttReceived(String topic, String msg) throws MqttException { String action = "mqtt收到消息"; if (StringUtil.isNullOrEmpty(topic)) { ERROR_LOGGER.error(action, "主题不能为空"); @@ -278,88 +296,135 @@ ERROR_LOGGER.error(action, "消息不能为空"); return; } - Gson gson = new Gson(); - JsonObject object = gson.fromJson(msg, JsonObject.class); - String asJsonObject = object.get("time").getAsString(); - Type listType = new TypeToken<List<CountVO>>() { - }.getType(); - //获取data数据 - JsonArray data = object.getAsJsonArray("Data"); - List<CountVO> listVo = gson.fromJson(data, listType); - for(CountVO vo:listVo){ - String[] split = vo.getName().split("-"); - vo.setName(split[0]); - vo.setCode(split[1]); - } - List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList(); - List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList(); - // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList(); - //根据设备编号分组 - Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName)); - for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) { - //获取设备信息 - WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null); - //获取设备所属监控点 - List<WaterFacilityParameterPO>parameterList=new ArrayList<>(); - //循环上报数据 - for(CountVO vo:entry.getValue()){ - //保存数据 - WaterFacilityParameterPO parameterPO1 = parameterPOList.stream().filter(item -> item.getFacilityId().equals(facilityPO.getId()) && item.getColumnsCode().equals(vo.getCode())).findFirst().orElse(null); - if(Objects.nonNull(parameterPO1)){ - parameterPO1.setColumnValue(vo.getValue()); - parameterPO1.setLastTime(asJsonObject); - parameterList.add(parameterPO1); + String[] split1 = topic.split("/"); + if(split1[1].equals("data")){ + //处理心跳 + Random random=new Random(); + int i=random.nextInt(100); + BigDecimal delayTime=BigDecimal.valueOf(1.0); + List<HeartbeatDataPO> list = heartbeatDataMapper.getList(); + if(Objects.nonNull(list)){ + HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null); + if(Objects.nonNull(heartbeatUp)){ + heartbeatUp.setDataValue(i); + heartbeatDataMapper.updateById(heartbeatUp); } - //判断报警 - // AlarmHistoryPO alarmHistoryPO=null; + HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null); + if(Objects.nonNull(alarmTime)){ + delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN); + } + } + //获取延时报警时间 + Gson gson = new Gson(); + JsonObject object = gson.fromJson(msg, JsonObject.class); + String asJsonObject = object.get("time").getAsString(); + Type listType = new TypeToken<List<CountVO>>() { + }.getType(); + //获取data数据 + JsonArray data = object.getAsJsonArray("Data"); + List<CountVO> listVo = gson.fromJson(data, listType); + for(CountVO vo:listVo){ + String[] split = vo.getName().split("-"); + vo.setName(split[0]); + vo.setCode(split[1]); + } + + List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList(); + List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList(); + //获取存入redis每天开始的启泵次数 + List<FacilityParameterDataVO> dataVOList=new ArrayList<>(); + String objects = redisTemplate.opsForValue().get("pump_start_number"); + if(StringUtil.isNotNullOrEmpty(objects)){ + dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class); + } + + // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList(); + //根据设备编号分组 + Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName)); + for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) { + //获取设备信息 + WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null); + //历史记录上报 + DataUploadRecordPO recordPO=new DataUploadRecordPO(); + recordPO.setFacilityId(facilityPO.getId()); + recordPO.setFacilityName(facilityPO.getFacilityName()); + String json = gson.toJson(entry.getValue()); + recordPO.setColumnsValue(json); + recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); + recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp())); + recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp()); + dataUploadRecordMapper.insert(recordPO); + //获取设备所属监控点 + List<WaterFacilityParameterPO>parameterList=new ArrayList<>(); + //循环上报数据 + for(CountVO vo:entry.getValue()){ + //保存数据 + WaterFacilityParameterPO parameterPO1 = parameterPOList.stream().filter(item -> item.getFacilityId().equals(facilityPO.getId()) && item.getColumnsCode().equals(vo.getCode())).findFirst().orElse(null); + if(Objects.nonNull(parameterPO1)){ + parameterPO1.setColumnValue(vo.getValue()); + parameterPO1.setLastTime(asJsonObject); + parameterList.add(parameterPO1); + } + //判断报警 + // AlarmHistoryPO alarmHistoryPO=null; // if(ListUtil.isNotNullOrEmpty(historyPOList)){ // alarmHistoryPO = historyPOList.stream().filter(item -> item.getFacilityId().equals(parameterPO1.getFacilityId()) && item.getCode().equals(parameterPO1.getColumnsCode()) && item.getIsSolve() == 0).findFirst().orElse(null); // } - switch (vo.getCode()){ - case "QF1": - //1号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue() ); - break; - case "QF2": - //2号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "QF3": - //3号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "QF4": - //4号空开吸合关闭报警 - QFCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "B001Fault": - //1号泵综合故障 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "B002Fault": - //2号泵综合故障 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "SQ": - //水侵报警 - SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "H1001Fault": - //高水位报警 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; - case "L1001Fault": - //低水位报警 - faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); - break; + switch (vo.getCode()){ + case "QF1": + //1号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "QF2": + //2号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "QF3": + //3号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "QF4": + //4号空开吸合关闭报警 + QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "B001Fault": + //1号泵综合故障 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "B002Fault": + //2号泵综合故障 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "SQ": + //水侵报警 + SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "H1001Fault": + //高水位报警 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "L1001Fault": + //低水位报警 + faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); + break; + case "B001StartNumber": + //1号泵启泵次数 + startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); + break; + case "B002StartNumber": + //2号泵启泵次数 + startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); + break; + } } - } - waterFacilityParameterMapper.updateById(parameterList); + waterFacilityParameterMapper.updateById(parameterList); + } } + } - public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){ + public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { if(value.equals("false")){ alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); @@ -370,11 +435,15 @@ alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); alarmHistoryMapper.insert(alarmHistoryPO); - + Map<String, Object> map = new HashMap<>(); + map.put("alarm","true"); + map.put("alarm_time",delayTime); + String dataValue = gson.toJson(map); + mqttServer.send("nanjing/alarm",dataValue); } } - public void faultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){ + public void faultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { if(value.equals("true")){ alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); @@ -385,11 +454,16 @@ alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); alarmHistoryMapper.insert(alarmHistoryPO); + Map<String, Object> map = new HashMap<>(); + map.put("alarm","true"); + map.put("alarm_time",delayTime); + String dataValue = gson.toJson(map); + mqttServer.send("nanjing/alarm",dataValue); } } - public void SQFaultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){ + public void SQFaultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { if(value.equals("1")){ alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); @@ -400,8 +474,35 @@ alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); alarmHistoryMapper.insert(alarmHistoryPO); + Map<String, Object> map = new HashMap<>(); + map.put("alarm","true"); + map.put("alarm_time",delayTime); + String dataValue = gson.toJson(map); + mqttServer.send("nanjing/alarm",dataValue); + } + } + public void startNumber(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value,List<FacilityParameterDataVO> dataVOList){ + if(ListUtil.isNotNullOrEmpty(dataVOList)){ + FacilityParameterDataVO facilityParameterDataVO = dataVOList.stream().filter(item -> item.getFacilityCode().equals(facilityPO.getFacilityCode()) && item.getColumnsCode().equals(parameterPO.getColumnsCode())).findFirst().orElse(null); + if(Objects.nonNull(facilityParameterDataVO)){ + Integer columnsValue = Integer.valueOf(facilityParameterDataVO.getColumnValue()); + Integer value1 = Integer.valueOf(value); + Integer dataNumber=value1-columnsValue; + alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); + if(dataNumber>5){ + alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); + AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); + alarmHistoryPO.setFacilityId(facilityPO.getId()); + alarmHistoryPO.setFacilityName(facilityPO.getFacilityName()); + alarmHistoryPO.setCode(parameterPO.getColumnsCode()); + alarmHistoryPO.setDescription(parameterPO.getColumnsShow()+"超过5次"); + alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); + alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); + alarmHistoryMapper.insert(alarmHistoryPO); + } + } } } } -- Gitblit v1.9.3