liulin
2025-06-19 4291b79617cace8d7c3628dd8deb23eeb2747a72
添加mqtt服务
已添加6个文件
已修改6个文件
542 ■■■■■ 文件已修改
pom.xml 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/common/config/MqttConfig.java 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/host/controller/admin/AdminUserController.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/host/mqtt/MQTTConnect.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/host/mqtt/MQTTServer.java 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/host/mqtt/MQTTStart.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/host/mqtt/MQTTSubsribe.java 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/host/mqtt/PushCallback.java 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fengdu/gas/service/AdminService.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-dev.yml 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-prod.yml 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-test.yml 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pom.xml
@@ -89,6 +89,12 @@
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <!-- mqtt 依赖 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!-- mybatis-plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
@@ -236,7 +242,12 @@
            <artifactId>spring-boot-starter-quartz</artifactId>
            <version>2.0.3.RELEASE</version>
        </dependency>
        <!--生成验证码工具-->
        <dependency>
            <groupId>com.github.whvcse</groupId>
            <artifactId>easy-captcha</artifactId>
            <version>1.6.2</version>
        </dependency>
        <!--************************************************** 单元测试相关 **************************************************-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
src/main/java/com/fengdu/gas/common/config/MqttConfig.java
对比新文件
@@ -0,0 +1,61 @@
package com.fengdu.gas.common.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
@Order(10)
public class MqttConfig {
    /**
     * 是否启用
     */
    private Boolean enable;
    /**
     * 主机
     */
    private String host;
    /**
     * 端口
     */
    private Integer port;
    /**
     * 用户名
     */
    private String user;
    /**
     * 密码
     */
    private String password;
    /**
     * 订阅主题
     */
    private String topic;
    /**
     * 订阅消息的客户端id
     */
    private String clientId;
    /**
     * 连接超时时间
     */
    private Integer connectionTimeout;
    /**
     * 心跳
     */
    private Integer keepAliveInterval;
    /**
     * 发送消息的客户端id
     */
    private String serverClientId;
}
src/main/java/com/fengdu/gas/host/controller/admin/AdminUserController.java
@@ -4,6 +4,7 @@
import com.fengdu.gas.common.ExecutedResult;
import com.fengdu.gas.common.PagerResult;
import com.fengdu.gas.common.jwt.LoginUserDTO;
import com.fengdu.gas.common.util.IPUtils;
import com.fengdu.gas.common.util.ParameterUtil;
import com.fengdu.gas.common.validator.ParameterValidateResult;
import com.fengdu.gas.common.validator.ParameterValidator;
@@ -25,6 +26,8 @@
import com.fengdu.gas.repository.vo.AdminUserVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
/**
 * 40.管理员用户相关接口
@@ -158,7 +161,31 @@
        //#endregion
        return service.adminLogin(request);
    }
    /**
     * 获取登录设备ip地址
     *
     * @param request 获取登录设备ip地址
     * @author lin.liu
     * @date 2023/02/17
     */
    @GetMapping("/getIpAddress")
    @NonLogin
    public ExecutedResult<String> getIpAddress(HttpServletRequest request) {
        return ExecutedResult.success(IPUtils.getIpAddress(request));
    }
    /**
     * 保存前端登录验证码
     *
     * @param uuid 请求参数
     * @author lin.liu
     * @date 2023/02/17
     */
    @GetMapping("/generate")
    @NonLogin
    public ExecutedResult<String> generate(@RequestParam String uuid) {
        //#endregion
        return  service.generate(uuid);
    }
    /**
     * 获取管理员权限列表
src/main/java/com/fengdu/gas/host/mqtt/MQTTConnect.java
对比新文件
@@ -0,0 +1,37 @@
package com.fengdu.gas.host.mqtt;
import com.fengdu.gas.common.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Order(20)
public class MQTTConnect {
    @Autowired
    private MqttConfig mqttConfig;
    //生成配置对象,用户名,密码等
    public MqttConnectOptions getOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(mqttConfig.getUser());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        options.setConnectionTimeout(mqttConfig.getConnectionTimeout());
        //设置心跳
        options.setKeepAliveInterval(mqttConfig.getKeepAliveInterval());
        return options;
    }
    public MqttConnectOptions getOptions(MqttConnectOptions options) {
        options.setCleanSession(false);
        options.setUserName(mqttConfig.getUser());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        options.setConnectionTimeout(mqttConfig.getConnectionTimeout());
        //设置心跳
        options.setKeepAliveInterval(mqttConfig.getKeepAliveInterval());
        return options;
    }
}
src/main/java/com/fengdu/gas/host/mqtt/MQTTServer.java
对比新文件
@@ -0,0 +1,95 @@
package com.fengdu.gas.host.mqtt;
import com.fengdu.gas.common.config.MqttConfig;
import com.fengdu.gas.common.enums.ELogger;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
 * 发布端
 * Title:Server
 * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
 */
@Component
@Order(7)
public class MQTTServer implements InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName());
    @Autowired
    private MqttConfig mqttConfig;
    @Autowired
    private MQTTConnect mqttConnect;
    static MqttClient client;
    private void connect() throws MqttException {
        //防止重复创建MQTTClient实例
        if (client == null) {
            //就是这里的clientId,服务器用来区分用户的,不能重复
            String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort();
            client = new MqttClient(host, mqttConfig.getServerClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
            //client.setCallback(new PushCallback());
        }
        MqttConnectOptions options = mqttConnect.getOptions();
        //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
        if (!client.isConnected()) {
            client.connect(options);
            LOGGER.info("---------------------mqtt连接成功");
        } else {//这里的逻辑是如果连接成功就重新连接
            client.disconnect();
            client.connect(mqttConnect.getOptions(options));
            LOGGER.info("---------------------mqtt连接成功");
        }
    }
    public boolean publish(MqttTopic topic , MqttMessage message) throws MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        LOGGER.debug("消息发送成功! " + token.isComplete());
        return token.isComplete();
    }
    /**
     * mqtt发送消息
     * @param topic 发布消息的主题
     * @param data 消息内容
     */
    public boolean send(String topic, String data) throws MqttException {
        MqttConnectOptions options = mqttConnect.getOptions();
        try {
            client.connect(mqttConnect.getOptions(options));
        } catch (Exception e) {}
        MqttTopic mqttTopic = client.getTopic(topic);
        MqttMessage message = new MqttMessage();
        //消息等级
        //level 0:最多一次的传输
        //level 1:至少一次的传输,(鸡肋)
        //level 2: 只有一次的传输
        message.setQos(0);
        //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
        message.setRetained(false);
        message.setPayload(data.getBytes());
        return this.publish(mqttTopic, message);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
//        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) {
            return;
        }
        this.connect();
    }
}
src/main/java/com/fengdu/gas/host/mqtt/MQTTStart.java
对比新文件
@@ -0,0 +1,27 @@
package com.fengdu.gas.host.mqtt;
import com.fengdu.gas.common.config.MqttConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Component
@Order(40)
public class MQTTStart implements InitializingBean {
    @Autowired
    private MQTTSubsribe mqttSubsribe;
    @Autowired
    private MqttConfig mqttConfig;
    @Override
    public void afterPropertiesSet() throws Exception {
        if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) {
            return;
        }
        mqttSubsribe.init();
    }
}
src/main/java/com/fengdu/gas/host/mqtt/MQTTSubsribe.java
对比新文件
@@ -0,0 +1,101 @@
package com.fengdu.gas.host.mqtt;
import com.fengdu.gas.common.config.MqttConfig;
import com.fengdu.gas.common.enums.ELogger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
 * 订阅端
 */
@Component
@Order(30)
public class MQTTSubsribe {
    private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName());
    @Autowired
    private MqttConfig mqttConfig;
    @Autowired
    private MQTTConnect mqttConnect;
    /**
     * 测试和正式环境不要使用同样的clientId 和主题
     * 如果和正式环境一样,正式环境启动后,本地再次启动会频繁断开重连,订阅的主题一样的话,测试的数据正式环境也会消费这些数据
     */
      //private static final String clientid = "测试clients";//测试
//    private String topic = "lunhan";
    public MqttClient client;
    //方法实现说明 断线重连方法,如果是持久订阅,重连是不需要再次订阅,如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
    // true为非持久订阅
    public void connect() throws MqttException {
        //防止重复创建MQTTClient实例
        if (client == null) {
            //就是这里的clientId,服务器用来区分用户的,不能重复,clientId不能和发布的clientId一样,否则会出现频繁断开连接和重连的问题
            //不仅不能和发布的clientId一样,而且也不能和其他订阅的clientId一样,如果想要接收之前的离线数据,这就需要将client的 setCleanSession
            // 设置为false,这样服务器才能保留它的session,再次建立连接的时候,它就会继续使用这个session了。 这时此连接clientId 是不能更改的。
            //但是其实还有一个问题,就是使用热部署的时候还是会出现频繁断开连接和重连的问题,可能是因为刚启动时的连接没断开,然后热部署的时候又进行了重连,重启一下就可以了
            //+ System.currentTimeMillis()
            String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort();
            client = new MqttClient(host, mqttConfig.getClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
            //如果是订阅者则添加回调类,发布不需要
            client.setCallback(new PushCallback(MQTTSubsribe.this));
//            client.setCallback(new PushCallback());
        }
        MqttConnectOptions options = mqttConnect.getOptions();
        //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
        if (!client.isConnected()) {
            client.connect(options);
            LOGGER.info("----------mqtt连接成功");
        } else {//这里的逻辑是如果连接成功就重新连接
            client.disconnect();
            client.connect(mqttConnect.getOptions(options));
            LOGGER.info("----------mqtt连接成功");
        }
    }
    public void init() {
        try {
            this.connect();
            LOGGER.info("----------mqtt执行");
            this.subscribe(mqttConfig.getTopic());
            LOGGER.info("----------mqtt执行订阅");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic .
     */
    public void subscribe(String topic) {
        subscribe(topic,2);
    }
    /**
     * 订阅某个主题
     *
     * @param topic .
     * @param qos .
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic,qos);
            //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
//            client.unsubscribe("需要解除订阅关系的主题");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/fengdu/gas/host/mqtt/PushCallback.java
对比新文件
@@ -0,0 +1,84 @@
package com.fengdu.gas.host.mqtt;
import com.fengdu.gas.common.enums.ELogger;
import com.fengdu.gas.common.util.LoggerUtil;
import com.fengdu.gas.common.util.ThreadPoolUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import java.nio.charset.StandardCharsets;
;
/**
 * 发布消息的回调类
 * <p>
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 * <p>
 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 * <p>
 * public void connectionLost(Throwable cause)在断开连接时调用。
 * <p>
 * public void deliveryComplete(MqttDeliveryToken token))
 * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 * 由 MqttClient.connect 激活此回调。
 */
public class PushCallback implements MqttCallback {
    private static final Logger LOGGER_DEBUG = LoggerUtil.get(ELogger.DEBUG);
    private static final Logger LOGGER_ERROR = LoggerUtil.get(ELogger.SYS_ERROR);
    private MQTTSubsribe mqttSubsribe;
    public PushCallback(MQTTSubsribe mqttSubsribe) {
        this.mqttSubsribe = mqttSubsribe;
    }
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        LOGGER_DEBUG.info("---------------------mqtt连接断开");
        while (true) {
            try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
                mqttSubsribe.init();
                LOGGER_DEBUG.info("---------------------mqtt重连成功");
                break;
            } catch (Exception e) {
                LOGGER_ERROR.error("mqtt连接丢失", e);
                continue;
            } finally {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // 重新设置中断状态
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    public void deliveryComplete(IMqttDeliveryToken token) {
        LOGGER_DEBUG.info("deliveryComplete---------" + token.isComplete());
    }
    public void messageArrived(String topic, MqttMessage message) {
        // subscribe后得到的消息会执行到这里面
        String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
        LOGGER_DEBUG.info("收到mqtt消息,主题: " + topic + ", Qos: " + message.getQos() + ", 消息内容: " + msg);
        //这里可以针对收到的消息做处理
        ThreadPoolUtil.getDefaultPool().execute(() -> {
            try {
                //调用方法
                //dataUploadYwjRecordService.mqttReceived(topic, msg);
            } catch (Exception e) {
                LOGGER_ERROR.error("messageArrived", e);
            }
        });
    }
}
src/main/java/com/fengdu/gas/service/AdminService.java
@@ -28,11 +28,14 @@
import com.fengdu.gas.repository.po.*;
import com.fengdu.gas.repository.vo.AdminRoleVO;
import com.fengdu.gas.repository.vo.AdminUserVO;
import com.wf.captcha.SpecCaptcha;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
@@ -63,7 +66,8 @@
    private AdminPermissionService permissionService;
    @Autowired
    private AdminPowerService adminPowerService;
    @Autowired
    private StringRedisTemplate redisTemplate;
    // 添加后台管理员
@@ -159,7 +163,22 @@
        return ExecutedResult.success(result);
    }
    /**
     * 生成验证码图片并返回其 Base64 编码字符串
     *
     * @param uuid 用于标识验证码的唯一标识符
     * @return 包含验证码图片 Base64 编码的响应结果
     */
    public ExecutedResult<String> generate(String uuid) {
        SpecCaptcha specCaptcha = new SpecCaptcha(100, 30, 4);
        // captcha.setCharType(Captcha.TYPE_DEFAULT);
        String code = specCaptcha.text().toLowerCase();
        // 缓存验证码
        redisTemplate.opsForValue().set(uuid, code);
        // 设置验证码3分钟后过期
        redisTemplate.expire(uuid, 3, TimeUnit.MINUTES);
        return ExecutedResult.success(specCaptcha.toBase64());
    }
    // 获取管理员权限列表
    public ExecutedResult<ResAdminPower> getPower(Long adminId) {
        ResAdminPower result = new ResAdminPower();
src/main/resources/application-dev.yml
@@ -66,6 +66,9 @@
  keepAliveInterval: 20
  # 发送消息的客户端id
  serverClientId:  fengdu-gases-service-publish-dev
# quartz定时任务配置
quartz:
  # 是否启用
src/main/resources/application-prod.yml
@@ -1,12 +1,11 @@
server:
  port: 8088
  port: 8040
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
  datasource:
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://113.250.189.120:57654/test_db1
    url: jdbc:postgresql://113.250.189.120:57654/fengdu_gases_system
    username: lunhan
    password: lunhan.20240330
    druid:
@@ -36,7 +35,27 @@
    root: info
    com.fengdu.gas.host: debug
mqtt:
  # 是否启用
  enable: true
  host: 113.250.189.120
  port: 1883
  user: admin
  password: public
  # 订阅主题
  topic: fengdu/#
  # 订阅消息的客户端id
  clientId: fengdu-gases-service-dev
  # 连接超时时间
  connectionTimeout: 10
  # 心跳
  keepAliveInterval: 20
  # 发送消息的客户端id
  serverClientId:  fengdu-gases-service-publish-dev
# quartz定时任务配置
quartz:
  # 是否启用
  enable: true
  enable: false
src/main/resources/application-test.yml
@@ -1,12 +1,12 @@
server:
  port: 8088
  port: 8040
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
  datasource:
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://113.250.189.120:57654/test_db1
    url: jdbc:postgresql://113.250.189.120:57654/fengdu_gases_system
    username: lunhan
    password: lunhan.20240330
    druid:
@@ -34,10 +34,42 @@
    path: ./logs
  level:
    root: info
    com.fengdu.gas.host: DEBUG
    com.fengdu.gas.service: DEBUG
    com.fengdu.gas: info
file:
  uploadBasicPath: ./logs/upload/
  fileBasicUrl: http://localhost/file
  disturbStr: com.fengdu.gas
# mybatis-plus 配置:
#mybatis-plus:
#  configuration:
#    ### 开启打印sql配置
#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
mqtt:
  # 是否启用
  enable: false
  host: 113.250.189.120
  port: 1885
  user: admin
  password: public
  # 订阅主题
  topic: fengdu/#
  # 订阅消息的客户端id
  clientId: fengdu-gases-service-dev
  # 连接超时时间
  connectionTimeout: 10
  # 心跳
  keepAliveInterval: 20
  # 发送消息的客户端id
  serverClientId:  fengdu-gases-service-publish-dev
# quartz定时任务配置
quartz:
  # 是否启用
  enable: true
  enable: false