| | |
| | | package com.nanjing.water.service; |
| | | |
| | | import com.baomidou.mybatisplus.core.toolkit.Sequence; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.JsonArray; |
| | | import com.google.gson.JsonObject; |
| | |
| | | import com.nanjing.water.common.model.Tuple; |
| | | import com.nanjing.water.common.util.*; |
| | | import com.nanjing.water.host.mqtt.CountVO; |
| | | import com.nanjing.water.host.mqtt.MQTTServer; |
| | | import com.nanjing.water.repository.impl.*; |
| | | import com.nanjing.water.repository.po.*; |
| | | import com.nanjing.water.repository.vo.FacilityParameterDataVO; |
| | | import org.apache.commons.lang3.BooleanUtils; |
| | | import org.eclipse.paho.client.mqttv3.MqttException; |
| | | import org.slf4j.Logger; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.data.redis.core.StringRedisTemplate; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.lang.reflect.Type; |
| | | import java.math.BigDecimal; |
| | | import java.math.RoundingMode; |
| | | import java.util.*; |
| | | import java.util.regex.Matcher; |
| | | import java.util.regex.Pattern; |
| | |
| | | private WaterMonitoryPointMapperImpl waterMonitoryPointMapper; |
| | | @Autowired |
| | | private AlarmHistoryMapperImpl alarmHistoryMapper; |
| | | @Autowired |
| | | private HeartbeatDataMapperImpl heartbeatDataMapper; |
| | | @Autowired |
| | | private DataUploadRecordMapperImpl dataUploadRecordMapper; |
| | | @Autowired |
| | | private MQTTServer mqttServer; |
| | | @Autowired |
| | | private StringRedisTemplate redisTemplate; |
| | | @Autowired |
| | | private ObjectMapper objectMapper; |
| | | |
| | | public ExecutedResult<Long> create(ReqCreateDataUploadRecord request) { |
| | | // 转换po |
| | |
| | | * @param topic 主题 |
| | | * @param msg 消息内容 |
| | | */ |
| | | public void mqttReceived(String topic, String msg) { |
| | | public void mqttReceived(String topic, String msg) throws MqttException { |
| | | String action = "mqtt收到消息"; |
| | | if (StringUtil.isNullOrEmpty(topic)) { |
| | | ERROR_LOGGER.error(action, "主题不能为空"); |
| | |
| | | ERROR_LOGGER.error(action, "消息不能为空"); |
| | | return; |
| | | } |
| | | //处理心跳 |
| | | Random random=new Random(); |
| | | int i=random.nextInt(100); |
| | | BigDecimal delayTime=BigDecimal.valueOf(1.0); |
| | | List<HeartbeatDataPO> list = heartbeatDataMapper.getList(); |
| | | if(Objects.nonNull(list)){ |
| | | HeartbeatDataPO heartbeatUp = list.stream().filter(item -> item.getDataKey().equals("heartbeat_up")).findFirst().orElse(null); |
| | | if(Objects.nonNull(heartbeatUp)){ |
| | | heartbeatUp.setDataValue(i); |
| | | heartbeatDataMapper.updateById(heartbeatUp); |
| | | } |
| | | HeartbeatDataPO alarmTime = list.stream().filter(item -> item.getDataKey().equals("alarm_time")).findFirst().orElse(null); |
| | | if(Objects.nonNull(alarmTime)){ |
| | | delayTime=new BigDecimal(alarmTime.getDataValue()).setScale(1, RoundingMode.DOWN); |
| | | } |
| | | } |
| | | //获取延时报警时间 |
| | | Gson gson = new Gson(); |
| | | JsonObject object = gson.fromJson(msg, JsonObject.class); |
| | | String asJsonObject = object.get("time").getAsString(); |
| | |
| | | vo.setName(split[0]); |
| | | vo.setCode(split[1]); |
| | | } |
| | | |
| | | List<WaterFacilityPO> facilityPOList = waterFacilityMapper.getList(); |
| | | List<WaterFacilityParameterPO> parameterPOList = waterFacilityParameterMapper.getList(); |
| | | //获取存入redis每天开始的启泵次数 |
| | | List<FacilityParameterDataVO> dataVOList=new ArrayList<>(); |
| | | String objects = redisTemplate.opsForValue().get("pump_start_number"); |
| | | if(StringUtil.isNotNullOrEmpty(objects)){ |
| | | dataVOList= SerializeUtil.toListObject(objects, FacilityParameterDataVO.class); |
| | | } |
| | | |
| | | // List<AlarmHistoryPO> historyPOList = alarmHistoryMapper.getList(); |
| | | //根据设备编号分组 |
| | | Map<String, List<CountVO>> collect = listVo.stream().collect(Collectors.groupingBy(CountVO::getName)); |
| | | for (Map.Entry<String, List<CountVO>> entry : collect.entrySet()) { |
| | | //获取设备信息 |
| | | WaterFacilityPO facilityPO = facilityPOList.stream().filter(x -> x.getFacilityCode().equals(entry.getKey())).findFirst().orElse(null); |
| | | //历史记录上报 |
| | | DataUploadRecordPO recordPO=new DataUploadRecordPO(); |
| | | recordPO.setFacilityId(facilityPO.getId()); |
| | | recordPO.setFacilityName(facilityPO.getFacilityName()); |
| | | String json = gson.toJson(entry.getValue()); |
| | | recordPO.setColumnsValue(json); |
| | | recordPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | recordPO.setUploadDate(LocalDateTimeUtil.toDateString(LocalDateTimeUtil.nowTimeStamp())); |
| | | recordPO.setUploadTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | dataUploadRecordMapper.insert(recordPO); |
| | | //获取设备所属监控点 |
| | | List<WaterFacilityParameterPO>parameterList=new ArrayList<>(); |
| | | //循环上报数据 |
| | |
| | | switch (vo.getCode()){ |
| | | case "QF1": |
| | | //1号空开吸合关闭报警 |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue() ); |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "QF2": |
| | | //2号空开吸合关闭报警 |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "QF3": |
| | | //3号空开吸合关闭报警 |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "QF4": |
| | | //4号空开吸合关闭报警 |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | QFCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "B001Fault": |
| | | //1号泵综合故障 |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "B002Fault": |
| | | //2号泵综合故障 |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "SQ": |
| | | //水侵报警 |
| | | SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | SQFaultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "H1001Fault": |
| | | //高水位报警 |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "L1001Fault": |
| | | //低水位报警 |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue()); |
| | | faultCreateAlarm(facilityPO,parameterPO1, vo.getValue(),gson,delayTime); |
| | | break; |
| | | case "B001StartNumber": |
| | | //1号泵启泵次数 |
| | | startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); |
| | | break; |
| | | case "B002StartNumber": |
| | | //2号泵启泵次数 |
| | | startNumber(facilityPO,parameterPO1, vo.getValue(),dataVOList); |
| | | break; |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){ |
| | | public void QFCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { |
| | | if(value.equals("false")){ |
| | | alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); |
| | | AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); |
| | |
| | | alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryMapper.insert(alarmHistoryPO); |
| | | |
| | | Map<String, Object> map = new HashMap<>(); |
| | | map.put("alarm","true"); |
| | | map.put("alarm_time",delayTime); |
| | | String dataValue = gson.toJson(map); |
| | | mqttServer.send("nanjing/alarm",dataValue); |
| | | |
| | | } |
| | | } |
| | | public void faultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){ |
| | | public void faultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { |
| | | if(value.equals("true")){ |
| | | alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); |
| | | AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); |
| | |
| | | alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryMapper.insert(alarmHistoryPO); |
| | | Map<String, Object> map = new HashMap<>(); |
| | | map.put("alarm","true"); |
| | | map.put("alarm_time",delayTime); |
| | | String dataValue = gson.toJson(map); |
| | | mqttServer.send("nanjing/alarm",dataValue); |
| | | |
| | | |
| | | } |
| | | } |
| | | public void SQFaultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value){ |
| | | public void SQFaultCreateAlarm(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value, Gson gson,BigDecimal delayTime) throws MqttException { |
| | | if(value.equals("1")){ |
| | | alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); |
| | | AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); |
| | |
| | | alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryMapper.insert(alarmHistoryPO); |
| | | Map<String, Object> map = new HashMap<>(); |
| | | map.put("alarm","true"); |
| | | map.put("alarm_time",delayTime); |
| | | String dataValue = gson.toJson(map); |
| | | mqttServer.send("nanjing/alarm",dataValue); |
| | | |
| | | } |
| | | } |
| | | |
| | | public void startNumber(WaterFacilityPO facilityPO,WaterFacilityParameterPO parameterPO,String value,List<FacilityParameterDataVO> dataVOList){ |
| | | if(ListUtil.isNotNullOrEmpty(dataVOList)){ |
| | | FacilityParameterDataVO facilityParameterDataVO = dataVOList.stream().filter(item -> item.getFacilityCode().equals(facilityPO.getFacilityCode()) && item.getColumnsCode().equals(parameterPO.getColumnsCode())).findFirst().orElse(null); |
| | | if(Objects.nonNull(facilityParameterDataVO)){ |
| | | Integer columnsValue = Integer.valueOf(facilityParameterDataVO.getColumnValue()); |
| | | Integer value1 = Integer.valueOf(value); |
| | | Integer dataNumber=value1-columnsValue; |
| | | alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); |
| | | if(dataNumber>5){ |
| | | alarmHistoryMapper.deleteList(parameterPO.getFacilityId(),parameterPO.getColumnsCode()); |
| | | AlarmHistoryPO alarmHistoryPO=new AlarmHistoryPO(); |
| | | alarmHistoryPO.setFacilityId(facilityPO.getId()); |
| | | alarmHistoryPO.setFacilityName(facilityPO.getFacilityName()); |
| | | alarmHistoryPO.setCode(parameterPO.getColumnsCode()); |
| | | alarmHistoryPO.setDescription(parameterPO.getColumnsShow()+"超过5次"); |
| | | alarmHistoryPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryPO.setLastTime(LocalDateTimeUtil.nowTimeStamp()); |
| | | alarmHistoryMapper.insert(alarmHistoryPO); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |