From 60041d6ee9bd2fcd8b6bcb827bf46e0727941665 Mon Sep 17 00:00:00 2001
From: elkers <elkers@163.com>
Date: 星期六, 12 四月 2025 11:24:28 +0800
Subject: [PATCH] 添加报警记录

---
 src/main/java/com/nanjing/water/service/DataUploadRecordService.java |  124 ++++++++++++++++++++++++++++++++++++----
 1 files changed, 110 insertions(+), 14 deletions(-)

diff --git a/src/main/java/com/nanjing/water/service/DataUploadRecordService.java b/src/main/java/com/nanjing/water/service/DataUploadRecordService.java
index 7e5e689..138dba4 100644
--- a/src/main/java/com/nanjing/water/service/DataUploadRecordService.java
+++ b/src/main/java/com/nanjing/water/service/DataUploadRecordService.java
@@ -23,6 +23,7 @@
 package com.nanjing.water.service;
 
 import com.baomidou.mybatisplus.core.toolkit.Sequence;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
@@ -32,14 +33,21 @@
 import com.nanjing.water.common.model.Tuple;
 import com.nanjing.water.common.util.*;
 import com.nanjing.water.host.mqtt.CountVO;
+import com.nanjing.water.host.mqtt.MQTTServer;
 import com.nanjing.water.repository.impl.*;
 import com.nanjing.water.repository.po.*;
+import com.nanjing.water.repository.vo.FacilityParameterDataVO;
 import org.apache.commons.lang3.BooleanUtils;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.slf4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.stereotype.Service;
 
 import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -68,6 +76,16 @@
     private WaterMonitoryPointMapperImpl waterMonitoryPointMapper;
     @Autowired
     private AlarmHistoryMapperImpl alarmHistoryMapper;
+    @Autowired
+    private HeartbeatDataMapperImpl heartbeatDataMapper;
+    @Autowired
+    private DataUploadRecordMapperImpl dataUploadRecordMapper;
+    @Autowired
+    private MQTTServer mqttServer;
+    @Autowired
+    private StringRedisTemplate redisTemplate;
+    @Autowired
+    private ObjectMapper objectMapper;
 
     public ExecutedResult<Long> create(ReqCreateDataUploadRecord request) {
         // 转换po
@@ -268,7 +286,7 @@
      * @param topic 主题
      * @param msg 消息内容
      */
-    public void mqttReceived(String topic, String msg) {
+    public void mqttReceived(String topic, String msg) throws MqttException {
         String action = "mqtt收到消息";
         if (StringUtil.isNullOrEmpty(topic)) {
             ERROR_LOGGER.error(action, "主题不能为空");
@@ -278,6 +296,23 @@
             ERROR_LOGGER.error(action, "消息不能为空");
             return;
         }
+        //处理心跳
+        Random random=new Random();
+        int i=random.nextInt(100);
+        BigDecimal delayTime=BigDecimal.valueOf(1.0);
+        List<HeartbeatDataPO> list = heartbeatDataMapper.getList();
+        if(Objects.nonNull(list)){
+            HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null);
+            if(Objects.nonNull(heartbeatUp)){
+                heartbeatUp.setDataValue(i);
+                heartbeatDataMapper.updateById(heartbeatUp);
+            }
+            HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null);
+           if(Objects.nonNull(alarmTime)){
+               delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN);
+           }
+        }
+        //获取延时报警时间
         Gson gson = new Gson();
         JsonObject object = gson.fromJson(msg, JsonObject.class);
         String asJsonObject = object.get("time").getAsString();
@@ -291,14 +326,32 @@
             vo.setName(split[0]);
             vo.setCode(split[1]);
         }
+
         List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList();
         List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList();
+        //获取存入redis每天开始的启泵次数
+        List<FacilityParameterDataVO> dataVOList=new ArrayList<>();
+        String objects = redisTemplate.opsForValue().get("pump_start_number");
+        if(StringUtil.isNotNullOrEmpty(objects)){
+                dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class);
+        }
+
        // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList();
         //根据设备编号分组
         Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName));
         for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) {
             //获取设备信息
             WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null);
+            //历史记录上报
+            DataUploadRecordPO recordPO=new DataUploadRecordPO();
+            recordPO.setFacilityId(facilityPO.getId());
+            recordPO.setFacilityName(facilityPO.getFacilityName());
+            String json = gson.toJson(entry.getValue());
+            recordPO.setColumnsValue(json);
+            recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
+            recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp()));
+            recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp());
+            dataUploadRecordMapper.insert(recordPO);
             //获取设备所属监控点
             List<WaterFacilityParameterPO>parameterList=new ArrayList<>();
             //循环上报数据
@@ -318,39 +371,47 @@
                 switch (vo.getCode()){
                     case "QF1":
                         //1号空开吸合关闭报警
-                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue() );
+                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "QF2":
                         //2号空开吸合关闭报警
-                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "QF3":
                         //3号空开吸合关闭报警
-                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "QF4":
                         //4号空开吸合关闭报警
-                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "B001Fault":
                         //1号泵综合故障
-                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "B002Fault":
                         //2号泵综合故障
-                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "SQ":
                         //水侵报警
-                        SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "H1001Fault":
                         //高水位报警
-                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
                         break;
                     case "L1001Fault":
                         //低水位报警
-                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue());
+                        faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime);
+                        break;
+                    case "B001StartNumber":
+                        //1号泵启泵次数
+                        startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList);
+                        break;
+                    case "B002StartNumber":
+                        //2号泵启泵次数
+                        startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList);
                         break;
                 }
             }
@@ -359,7 +420,7 @@
         }
     }
 
-     public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){
+     public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException {
         if(value.equals("false")){
                 alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode());
                 AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO();
@@ -370,11 +431,15 @@
                 alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
                 alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp());
                 alarmHistoryMapper.insert(alarmHistoryPO);
-
+                Map<String, Object> map = new HashMap<>();
+                map.put("alarm","true");
+                map.put("alarm_time",delayTime);
+                String dataValue = gson.toJson(map);
+                mqttServer.send("nanjing/alarm",dataValue);
 
         }
      }
-    public void faultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){
+    public void faultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException {
         if(value.equals("true")){
             alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode());
             AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO();
@@ -385,11 +450,16 @@
             alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
             alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp());
             alarmHistoryMapper.insert(alarmHistoryPO);
+            Map<String, Object> map = new HashMap<>();
+            map.put("alarm","true");
+            map.put("alarm_time",delayTime);
+            String dataValue = gson.toJson(map);
+            mqttServer.send("nanjing/alarm",dataValue);
 
 
         }
     }
-    public void SQFaultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){
+    public void SQFaultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException {
         if(value.equals("1")){
             alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode());
             AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO();
@@ -400,8 +470,34 @@
             alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
             alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp());
             alarmHistoryMapper.insert(alarmHistoryPO);
+            Map<String, Object> map = new HashMap<>();
+            map.put("alarm","true");
+            map.put("alarm_time",delayTime);
+            String dataValue = gson.toJson(map);
+            mqttServer.send("nanjing/alarm",dataValue);
 
+        }
+    }
 
+    public void startNumber(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value,List<FacilityParameterDataVO> dataVOList){
+        if(ListUtil.isNotNullOrEmpty(dataVOList)){
+            FacilityParameterDataVO facilityParameterDataVO = dataVOList.stream().filter(item -> item.getFacilityCode().equals(facilityPO.getFacilityCode()) && item.getColumnsCode().equals(parameterPO.getColumnsCode())).findFirst().orElse(null);
+            if(Objects.nonNull(facilityParameterDataVO)){
+                Integer columnsValue = Integer.valueOf(facilityParameterDataVO.getColumnValue());
+                Integer value1 = Integer.valueOf(value);
+                Integer dataNumber=value1-columnsValue;
+                if(dataNumber>5){
+                    alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode());
+                    AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO();
+                    alarmHistoryPO.setFacilityId(facilityPO.getId());
+                    alarmHistoryPO.setFacilityName(facilityPO.getFacilityName());
+                    alarmHistoryPO.setCode(parameterPO.getColumnsCode());
+                    alarmHistoryPO.setDescription(parameterPO.getColumnsShow()+"超过5次");
+                    alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
+                    alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp());
+                    alarmHistoryMapper.insert(alarmHistoryPO);
+                }
+            }
         }
     }
 }

--
Gitblit v1.9.3