From fb2f11d7d502ceacbe7fbed176bea4ab0f152f69 Mon Sep 17 00:00:00 2001
From: liulin <lin.liu@88.com>
Date: 星期四, 03 七月 2025 18:17:59 +0800
Subject: [PATCH] 添加mqtt

---
 src/main/java/com/lunhan/water/service/PaymentRecordsService.java |  284 ++++++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 251 insertions(+), 33 deletions(-)

diff --git a/src/main/java/com/lunhan/water/service/PaymentRecordsService.java b/src/main/java/com/lunhan/water/service/PaymentRecordsService.java
index 366160b..e3bd5c6 100644
--- a/src/main/java/com/lunhan/water/service/PaymentRecordsService.java
+++ b/src/main/java/com/lunhan/water/service/PaymentRecordsService.java
@@ -1,38 +1,54 @@
 /**
-#                                                    __----~~~~~~~~~~~------___
-#                                   .  .   ~~//====......          __--~ ~~
-#                   -.            \_|//     |||\\  ~~~~~~::::... /~
-#                ___-==_       _-~o~  \/    |||  \\            _/~~-
-#        __---~~~.==~||\=_    -_--~/_-~|-   |\\   \\        _/~
-#    _-~~     .=~    |  \\-_    '-~7  /-   /  ||    \      /
-#  .~       .~       |   \\ -_    /  /-   /   ||      \   /
-# /  ____  /         |     \\ ~-_/  /|- _/   .||       \ /
-# |~~    ~~|--~~~~--_ \     ~==-/   | \~--===~~        .\
-#          '         ~-|      /|    |-~\~~       __--~~
-#                      |-~~-_/ |    |   ~\_   _-~            /\
-#                           /  \     \__   \/~                \__
-#                       _--~ _/ | .-~~____--~-/                  ~~==.
-#                      ((->/~   '.|||' -_|    ~~-/ ,              . _||
-#                                 -_     ~\      ~~---l__i__i__i--~~_/
-#                                 _-~-__   ~)  \--______________--~~
-#                               //.-~~~-~_--~- |-------~~~~~~~~
-#                                      //.-~~~--\
-#                  神兽保佑
-#                  永无BUG!
-*/
+ #                                                    __----~~~~~~~~~~~------___
+ #                                   .  .   ~~//====......          __--~ ~~
+ #                   -.            \_|//     |||\\  ~~~~~~::::... /~
+ #                ___-==_       _-~o~  \/    |||  \\            _/~~-
+ #        __---~~~.==~||\=_    -_--~/_-~|-   |\\   \\        _/~
+ #    _-~~     .=~    |  \\-_    '-~7  /-   /  ||    \      /
+ #  .~       .~       |   \\ -_    /  /-   /   ||      \   /
+ # /  ____  /         |     \\ ~-_/  /|- _/   .||       \ /
+ # |~~    ~~|--~~~~--_ \     ~==-/   | \~--===~~        .\
+ #          '         ~-|      /|    |-~\~~       __--~~
+ #                      |-~~-_/ |    |   ~\_   _-~            /\
+ #                           /  \     \__   \/~                \__
+ #                       _--~ _/ | .-~~____--~-/                  ~~==.
+ #                      ((->/~   '.|||' -_|    ~~-/ ,              . _||
+ #                                 -_     ~\      ~~---l__i__i__i--~~_/
+ #                                 _-~-__   ~)  \--______________--~~
+ #                               //.-~~~-~_--~- |-------~~~~~~~~
+ #                                      //.-~~~--\
+ #                  神兽保佑
+ #                  永无BUG!
+ */
 package com.lunhan.water.service;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
 import com.lunhan.water.common.*;
 import com.lunhan.water.common.enums.*;
+import com.lunhan.water.common.jwt.LoginUserDTO;
 import com.lunhan.water.common.model.Tuple;
 import com.lunhan.water.common.util.*;
+import com.lunhan.water.entity.enums.EBillPayStatus;
+import com.lunhan.water.entity.enums.ECapitalChange;
+import com.lunhan.water.entity.request.paymentrecords.ReqBuyWater;
+import com.lunhan.water.host.mqtt.CountVO;
+import com.lunhan.water.host.mqtt.MQTTServer;
+import com.lunhan.water.repository.impl.*;
+import com.lunhan.water.repository.po.*;
+import com.lunhan.water.repository.vo.WaterFacilityVO;
 import org.apache.commons.lang3.BooleanUtils;
+import org.slf4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
 import java.util.*;
 import java.util.stream.Collectors;
-import com.lunhan.water.repository.impl.PaymentRecordsMapperImpl;
-import com.lunhan.water.repository.po.PaymentRecordsPO;
+
 import com.lunhan.water.entity.request.paymentrecords.ReqCreatePaymentRecords;
 import com.lunhan.water.entity.request.paymentrecords.ReqModifyPaymentRecords;
 import com.lunhan.water.entity.search.SearchPaymentRecords;
@@ -45,8 +61,22 @@
  */
 @Service
 public class PaymentRecordsService extends BaseService {
+    private static final Logger DEBUG_LOGGER = LoggerUtil.get(ELogger.DEBUG);
+    private static final Logger ERROR_LOGGER = LoggerUtil.get(ELogger.SYS_ERROR);
     @Autowired
     private PaymentRecordsMapperImpl mapper;
+    @Autowired
+    private UserLoginMapperImpl userLoginMapper;
+    @Autowired
+    private WaterFacilityMapperImpl waterFacilityMapper;
+    @Autowired
+    private MQTTServer mqttServer;
+    @Autowired
+    private UserCapitalChangeMapperImpl userCapitalChangeMapper;
+    @Autowired
+    private HeartbeatDataMapperImpl heartbeatDataMapper;
+    @Autowired
+    private FacilityAlarmRecordMapperImpl facilityAlarmRecordMapper;
 
     public ExecutedResult<Long> create(ReqCreatePaymentRecords request) {
         // 转换po
@@ -92,7 +122,7 @@
         return ExecutedResult.success(result);
     }
 
-//    public ExecutedResult<String> stop(Long id) {
+    //    public ExecutedResult<String> stop(Long id) {
 //        // 验证记录是否存在
 //        ExecutedResult<PaymentRecordsPO> checkExists = this.check4Id(id);
 //        if (checkExists.isFailed()) {
@@ -183,6 +213,163 @@
 //        return ExecutedResult.success();
 //    }
 
+    /**
+     * mqtt发送取水指令
+     * @param request 消息内容
+     */
+    public ExecutedResult<String> userWaterInTaking(LoginUserDTO loginUser, ReqBuyWater request) {
+        WaterFacilityPO facilityPO = waterFacilityMapper.getById(request.getFacilityId());
+        if (Objects.isNull(facilityPO)) {
+            return ExecutedResult.failed("未查询到取水设备!");
+        }
+        UserLoginPO userLoginPO = userLoginMapper.get4Openid(loginUser.getUserId());
+        if (Objects.isNull(userLoginPO)) {
+            return ExecutedResult.failed("用户数据不存在!");
+        }
+        if (userLoginPO.getBalance().compareTo(request.getAmount()) < 0) {
+            return ExecutedResult.failed("余额水量不足!");
+        }
+        //发送mqtt取水指令
+        String topic = "zundong/" + facilityPO.getFacilityCode() + "/switch";
+        JsonObject data = new JsonObject();
+        data.addProperty(facilityPO.getFacilityCode() + "_amount", request.getAmount().multiply(BigDecimal.valueOf(100)));
+        data.addProperty(facilityPO.getFacilityCode() + "_user", userLoginPO.getUserCode());
+        data.addProperty(facilityPO.getFacilityCode() + "_state", 1);
+        if (Objects.nonNull(mqttServer)) {
+            mqttServer.send(topic, data.toString());
+        } else {
+            try {
+                System.out.println("mqttServer bean尚未初始化...");
+                mqttServer = SpringUtil.getBean(MQTTServer.class);
+                System.out.println("mqttServer 初始化bean成功!");
+            } catch (Exception ignored) {
+            }
+            if (Objects.nonNull(mqttServer)) {
+                mqttServer.send(topic, data.toString());
+            }
+        }
+        return ExecutedResult.success("请点击设备取水按钮取水");
+    }
+
+    /**
+     * mqtt收到消息
+     * @param topic 主题
+     * @param msg 消息内容
+     */
+    public void mqttReceived(String topic, String msg) {
+        if (StringUtil.isNullOrEmpty(topic)) {
+            ERROR_LOGGER.error("主题不能为空");
+            return;
+        }
+        if (StringUtil.isNullOrEmpty(msg)) {
+            ERROR_LOGGER.error("消息不能为空");
+            return;
+        }
+        String[] array = StringUtil.split(topic, "/");
+        if (array.length != 3) {
+            ERROR_LOGGER.error("主题未能解析, " + topic);
+            return;
+        }
+        WaterFacilityPO facilityPO = waterFacilityMapper.getCode(array[1]);
+        if (Objects.isNull(facilityPO)) {
+            DEBUG_LOGGER.error("设备数据未找到!");
+            return;
+        }
+
+//        数据上报格式:
+//        主题格式:zundong/QS001/data
+//        数据格式:{"Data":
+//          [
+//              {"name":"QS001_state","value":1}, //状态
+//              {"name":"QS001_fault","value":0}, //是否故障
+//              {"name":"QS001_count","value":10000} //总水量
+//          ],"time":"2025-07-02 15:19:17"
+//        }
+//        心跳上报格式:
+//        主题格式:zundong/QS001/state
+//        数据格式:{"Data":
+//              [
+//               {"name":"QS001_heartbeat","value":1} //心跳
+//              ],"time":"2025-07-02 15:19:17"
+//          }
+        //消息解析
+        Gson gson = new Gson();
+        JsonObject object = gson.fromJson(msg, JsonObject.class);
+        String time = object.get("time").getAsString();
+        //long time = LocalDateTimeUtil.getTimeStamp(asJsonObject).getTime();
+        Type listType = new TypeToken<List<CountVO>>() {
+        }.getType();
+        JsonArray data = object.getAsJsonArray("Data");
+        List<CountVO> listVo = gson.fromJson(data, listType);
+        switch (array[2]) {
+            case "data":
+                uploadData(time,facilityPO,listVo);
+                break;
+            case "state":
+                uploadState(time,facilityPO,listVo);
+                break;
+        }
+    }
+
+    public void uploadData(String time,WaterFacilityPO facilityPO, List<CountVO> listVo) {
+        CountVO stateVo = listVo.stream().filter(x -> x.getName().equals(facilityPO.getFacilityCode() + "_state")).findFirst().orElse(null);
+        CountVO faultVo = listVo.stream().filter(x -> x.getName().equals(facilityPO.getFacilityCode() + "_fault")).findFirst().orElse(null);
+        CountVO countVO = listVo.stream().filter(x -> x.getName().equals(facilityPO.getFacilityCode() + "_count")).findFirst().orElse(null);
+        CountVO userVo = listVo.stream().filter(x -> x.getName().equals(facilityPO.getFacilityCode() + "_user")).findFirst().orElse(null);
+        switch (stateVo.getValue()) {
+            case "0"://取水完成
+                //扣除余额
+                UserLoginPO user = userLoginMapper.get4UserCode(userVo.getValue());
+               //创建消费记录
+                PaymentRecordsPO recordsPO=new PaymentRecordsPO();
+                recordsPO.setUserId(user.getId());
+                recordsPO.setUserName(user.getUserName());
+                recordsPO.setFacilityCode(facilityPO.getFacilityCode());
+                BigDecimal bigDecimal = new BigDecimal(countVO.getValue()).divide(new BigDecimal(100)).setScale(2, BigDecimal.ROUND_DOWN);
+                recordsPO.setPaymentAmount(bigDecimal);
+                recordsPO.setWaterAmount(bigDecimal);
+                recordsPO.setPayTime(time);
+                recordsPO.setPayStatus(EBillPayStatus.PAID.getValue());
+                recordsPO.setComment("取水");
+                recordsPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
+                mapper.insert(recordsPO);
+                //修改余额
+                BigDecimal beforeMoney=user.getBalance();
+                BigDecimal subtract = beforeMoney.subtract(bigDecimal);
+                user.setBalance(subtract);
+                userLoginMapper.updateById(user);
+                //生成余额变动记录
+                UserCapitalChangePO changePO=new UserCapitalChangePO();
+                changePO.setUserId(user.getId());
+                changePO.setBusiness(ECapitalChange.WATER_BILL_COUNTER_PAY.getValue());
+                changePO.setBusinessName("取水");
+                changePO.setBusinessCode("QS"+SnowFlakeUtil.getId());
+                changePO.setChangeMoney(bigDecimal);
+                changePO.setBeforeMoney(beforeMoney);
+                changePO.setAfterMoney(subtract);
+                changePO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
+                userCapitalChangeMapper.insert(changePO);
+                break;
+            case "1"://运行
+                break;
+            case "2"://停止
+                break;
+            case "3"://故障
+                break;
+        }
+    }
+    public void uploadState(String time,WaterFacilityPO facilityPO, List<CountVO> listVo){
+        CountVO heartbeatVO = listVo.stream().filter(x -> x.getName().equals(facilityPO.getFacilityCode() + "_heartbeat")).findFirst().orElse(null);
+        HeartbeatDataPO heartbeatDataPO = heartbeatDataMapper.getByKey(facilityPO.getFacilityCode());
+        if(Objects.isNull(heartbeatDataPO)){
+            HeartbeatDataPO dataPO=new HeartbeatDataPO();
+            dataPO.setDataKey(facilityPO.getFacilityCode());
+            dataPO.setDataValue(Integer.valueOf(heartbeatVO.getValue()));
+            dataPO.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
+            dataPO.setRemark(LocalDateTimeUtil.nowDateTimeStr());
+            heartbeatDataMapper.insert(dataPO);
+        }
+    }
     public ExecutedResult<List<PaymentRecordsVO>> getList(List<Long> listId) {
         List<PaymentRecordsVO> result = new ArrayList<>();
 
@@ -196,21 +383,50 @@
 
     public ExecutedResult<PagerResult<PaymentRecordsVO>> search(SearchPaymentRecords search) {
         // 处理创建时间范围-查询参数
-        Tuple<String, String> createTimeRange = ParameterUtil.getTimeRange(search.getCreateTimeRange());
-        if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem1())) {
-            search.setCreateTimeStart(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem1()).getTime());
+        if (search.getDateType() != null && search.getDateType() > 0) {
+            Long beginTime = 0L;
+            Long endTime = 0L;
+            switch (search.getDateType()) {
+                case 1:
+                    beginTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.todayStartTime()).getTime();
+                    endTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.todayEndTime()).getTime();
+                    break;
+                case 2:
+                    beginTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.weekStartTime()).getTime();
+                    endTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.weekEndTime()).getTime();
+                    break;
+                case 3:
+                    beginTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.monthStartTime()).getTime();
+                    endTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.monthEndTime()).getTime();
+                    break;
+                case 4:
+                    beginTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.yearStartTime()).getTime();
+                    endTime = LocalDateTimeUtil.getTimeStamp(LocalDateTimeUtils.yearEndTime()).getTime();
+                    break;
+            }
+            if (beginTime > 0) {
+                search.setCreateTimeStart(beginTime);
+            }
+            if (endTime > 0) {
+                search.setCreateTimeEnd(endTime);
+            }
         }
-        if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem2())) {
-            search.setCreateTimeEnd(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem2()).getTime());
-        }
-
         PagerResult<PaymentRecordsPO> pageList = mapper.search(search);
         List<PaymentRecordsVO> listVo = new ArrayList<>();
         List<PaymentRecordsPO> list = pageList.getList();
         if (ListUtil.isNotNullOrEmpty(list)) {
             pageList.setLastId(list.get(list.size() - 1).getId());
             // 转换vo
-            listVo = PaymentRecordsConvert.INSTANCE.toVo(list);
+            listVo = CopierUtil.mapTo(list, PaymentRecordsVO.class);
+            //获取列表所有用户id
+            List<Long> idList = listVo.stream().map(PaymentRecordsVO::getUserId).distinct().collect(Collectors.toList());
+            List<UserLoginPO> userList = userLoginMapper.getListById(idList);
+            for (PaymentRecordsVO vo : listVo) {
+                UserLoginPO userLoginPO = userList.stream().filter(x -> x.getId().equals(vo.getUserId())).findFirst().orElse(null);
+                if (Objects.nonNull(userLoginPO)) {
+                    vo.setBalance(userLoginPO.getBalance());
+                }
+            }
         }
         PagerResult<PaymentRecordsVO> result = new PagerResult<>(pageList.getLimit(), pageList.getPage(), pageList.getTotal(), listVo);
         result.setLastId(pageList.getLastId());
@@ -224,6 +440,7 @@
         }
         return ExecutedResult.success(exists);
     }
+
     protected ExecutedResult<List<PaymentRecordsPO>> check4Id(List<Long> listId) {
         // 从数据库查找null
         List<PaymentRecordsPO> list = mapper.getList(listId);
@@ -241,4 +458,5 @@
             }
         }
         return ExecutedResult.success(list);
-    }}
+    }
+}

--
Gitblit v1.9.3