pom.xml
@@ -89,6 +89,12 @@ <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <!-- mqtt 依赖 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> @@ -236,7 +242,12 @@ <artifactId>spring-boot-starter-quartz</artifactId> <version>2.0.3.RELEASE</version> </dependency> <!--生成验证码工具--> <dependency> <groupId>com.github.whvcse</groupId> <artifactId>easy-captcha</artifactId> <version>1.6.2</version> </dependency> <!--************************************************** 单元测试相关 **************************************************--> <dependency> <groupId>org.springframework.boot</groupId> src/main/java/com/fengdu/gas/common/config/MqttConfig.java
对比新文件 @@ -0,0 +1,61 @@ package com.fengdu.gas.common.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "mqtt") @Order(10) public class MqttConfig { /** * 是否启用 */ private Boolean enable; /** * 主机 */ private String host; /** * 端口 */ private Integer port; /** * 用户名 */ private String user; /** * 密码 */ private String password; /** * 订阅主题 */ private String topic; /** * 订阅消息的客户端id */ private String clientId; /** * 连接超时时间 */ private Integer connectionTimeout; /** * 心跳 */ private Integer keepAliveInterval; /** * 发送消息的客户端id */ private String serverClientId; } src/main/java/com/fengdu/gas/host/controller/admin/AdminUserController.java
@@ -4,6 +4,7 @@ import com.fengdu.gas.common.ExecutedResult; import com.fengdu.gas.common.PagerResult; import com.fengdu.gas.common.jwt.LoginUserDTO; import com.fengdu.gas.common.util.IPUtils; import com.fengdu.gas.common.util.ParameterUtil; import com.fengdu.gas.common.validator.ParameterValidateResult; import com.fengdu.gas.common.validator.ParameterValidator; @@ -25,6 +26,8 @@ import com.fengdu.gas.repository.vo.AdminUserVO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; /** * 40.管理员用户相关接口 @@ -158,7 +161,31 @@ //#endregion return service.adminLogin(request); } /** * 获取登录设备ip地址 * * @param request 获取登录设备ip地址 * @author lin.liu * @date 2023/02/17 */ @GetMapping("/getIpAddress") @NonLogin public ExecutedResult<String> getIpAddress(HttpServletRequest request) { return ExecutedResult.success(IPUtils.getIpAddress(request)); } /** * 保存前端登录验证码 * * @param uuid 请求参数 * @author lin.liu * @date 2023/02/17 */ @GetMapping("/generate") @NonLogin public ExecutedResult<String> generate(@RequestParam String uuid) { //#endregion return service.generate(uuid); } /** * 获取管理员权限列表 src/main/java/com/fengdu/gas/host/mqtt/MQTTConnect.java
对比新文件 @@ -0,0 +1,37 @@ package com.fengdu.gas.host.mqtt; import com.fengdu.gas.common.config.MqttConfig; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @Order(20) public class MQTTConnect { @Autowired private MqttConfig mqttConfig; //生成配置对象,用户名,密码等 public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(mqttConfig.getUser()); options.setPassword(mqttConfig.getPassword().toCharArray()); options.setConnectionTimeout(mqttConfig.getConnectionTimeout()); //设置心跳 options.setKeepAliveInterval(mqttConfig.getKeepAliveInterval()); return options; } public MqttConnectOptions getOptions(MqttConnectOptions options) { options.setCleanSession(false); options.setUserName(mqttConfig.getUser()); options.setPassword(mqttConfig.getPassword().toCharArray()); options.setConnectionTimeout(mqttConfig.getConnectionTimeout()); //设置心跳 options.setKeepAliveInterval(mqttConfig.getKeepAliveInterval()); return options; } } src/main/java/com/fengdu/gas/host/mqtt/MQTTServer.java
对比新文件 @@ -0,0 +1,95 @@ package com.fengdu.gas.host.mqtt; import com.fengdu.gas.common.config.MqttConfig; import com.fengdu.gas.common.enums.ELogger; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.Objects; /** * 发布端 * Title:Server * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 */ @Component @Order(7) public class MQTTServer implements InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName()); @Autowired private MqttConfig mqttConfig; @Autowired private MQTTConnect mqttConnect; static MqttClient client; private void connect() throws MqttException { //防止重复创建MQTTClient实例 if (client == null) { //就是这里的clientId,服务器用来区分用户的,不能重复 String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort(); client = new MqttClient(host, mqttConfig.getServerClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存 //client.setCallback(new PushCallback()); } MqttConnectOptions options = mqttConnect.getOptions(); //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 if (!client.isConnected()) { client.connect(options); LOGGER.info("---------------------mqtt连接成功"); } else {//这里的逻辑是如果连接成功就重新连接 client.disconnect(); client.connect(mqttConnect.getOptions(options)); LOGGER.info("---------------------mqtt连接成功"); } } public boolean publish(MqttTopic topic , MqttMessage message) throws MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); LOGGER.debug("消息发送成功! " + token.isComplete()); return token.isComplete(); } /** * mqtt发送消息 * @param topic 发布消息的主题 * @param data 消息内容 */ public boolean send(String topic, String data) throws MqttException { MqttConnectOptions options = mqttConnect.getOptions(); try { client.connect(mqttConnect.getOptions(options)); } catch (Exception e) {} MqttTopic mqttTopic = client.getTopic(topic); MqttMessage message = new MqttMessage(); //消息等级 //level 0:最多一次的传输 //level 1:至少一次的传输,(鸡肋) //level 2: 只有一次的传输 message.setQos(0); //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false message.setRetained(false); message.setPayload(data.getBytes()); return this.publish(mqttTopic, message); } @Override public void afterPropertiesSet() throws Exception { // MemoryPersistence设置clientid的保存形式,默认为以内存保存 // client = new MqttClient(HOST, clientid, new MemoryPersistence()); if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) { return; } this.connect(); } } src/main/java/com/fengdu/gas/host/mqtt/MQTTStart.java
对比新文件 @@ -0,0 +1,27 @@ package com.fengdu.gas.host.mqtt; import com.fengdu.gas.common.config.MqttConfig; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.Objects; @Component @Order(40) public class MQTTStart implements InitializingBean { @Autowired private MQTTSubsribe mqttSubsribe; @Autowired private MqttConfig mqttConfig; @Override public void afterPropertiesSet() throws Exception { if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) { return; } mqttSubsribe.init(); } } src/main/java/com/fengdu/gas/host/mqtt/MQTTSubsribe.java
对比新文件 @@ -0,0 +1,101 @@ package com.fengdu.gas.host.mqtt; import com.fengdu.gas.common.config.MqttConfig; import com.fengdu.gas.common.enums.ELogger; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * 订阅端 */ @Component @Order(30) public class MQTTSubsribe { private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName()); @Autowired private MqttConfig mqttConfig; @Autowired private MQTTConnect mqttConnect; /** * 测试和正式环境不要使用同样的clientId 和主题 * 如果和正式环境一样,正式环境启动后,本地再次启动会频繁断开重连,订阅的主题一样的话,测试的数据正式环境也会消费这些数据 */ //private static final String clientid = "测试clients";//测试 // private String topic = "lunhan"; public MqttClient client; //方法实现说明 断线重连方法,如果是持久订阅,重连是不需要再次订阅,如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true); // true为非持久订阅 public void connect() throws MqttException { //防止重复创建MQTTClient实例 if (client == null) { //就是这里的clientId,服务器用来区分用户的,不能重复,clientId不能和发布的clientId一样,否则会出现频繁断开连接和重连的问题 //不仅不能和发布的clientId一样,而且也不能和其他订阅的clientId一样,如果想要接收之前的离线数据,这就需要将client的 setCleanSession // 设置为false,这样服务器才能保留它的session,再次建立连接的时候,它就会继续使用这个session了。 这时此连接clientId 是不能更改的。 //但是其实还有一个问题,就是使用热部署的时候还是会出现频繁断开连接和重连的问题,可能是因为刚启动时的连接没断开,然后热部署的时候又进行了重连,重启一下就可以了 //+ System.currentTimeMillis() String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort(); client = new MqttClient(host, mqttConfig.getClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存 //如果是订阅者则添加回调类,发布不需要 client.setCallback(new PushCallback(MQTTSubsribe.this)); // client.setCallback(new PushCallback()); } MqttConnectOptions options = mqttConnect.getOptions(); //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 if (!client.isConnected()) { client.connect(options); LOGGER.info("----------mqtt连接成功"); } else {//这里的逻辑是如果连接成功就重新连接 client.disconnect(); client.connect(mqttConnect.getOptions(options)); LOGGER.info("----------mqtt连接成功"); } } public void init() { try { this.connect(); LOGGER.info("----------mqtt执行"); this.subscribe(mqttConfig.getTopic()); LOGGER.info("----------mqtt执行订阅"); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅某个主题,qos默认为0 * * @param topic . */ public void subscribe(String topic) { subscribe(topic,2); } /** * 订阅某个主题 * * @param topic . * @param qos . */ public void subscribe(String topic, int qos) { try { client.subscribe(topic,qos); //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。 // client.unsubscribe("需要解除订阅关系的主题"); } catch (MqttException e) { e.printStackTrace(); } } } src/main/java/com/fengdu/gas/host/mqtt/PushCallback.java
对比新文件 @@ -0,0 +1,84 @@ package com.fengdu.gas.host.mqtt; import com.fengdu.gas.common.enums.ELogger; import com.fengdu.gas.common.util.LoggerUtil; import com.fengdu.gas.common.util.ThreadPoolUtil; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import java.nio.charset.StandardCharsets; ; /** * 发布消息的回调类 * <p> * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: * <p> * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 * <p> * public void connectionLost(Throwable cause)在断开连接时调用。 * <p> * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 */ public class PushCallback implements MqttCallback { private static final Logger LOGGER_DEBUG = LoggerUtil.get(ELogger.DEBUG); private static final Logger LOGGER_ERROR = LoggerUtil.get(ELogger.SYS_ERROR); private MQTTSubsribe mqttSubsribe; public PushCallback(MQTTSubsribe mqttSubsribe) { this.mqttSubsribe = mqttSubsribe; } public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 LOGGER_DEBUG.info("---------------------mqtt连接断开"); while (true) { try {//如果没有发生异常说明连接成功,如果发生异常,则死循环 mqttSubsribe.init(); LOGGER_DEBUG.info("---------------------mqtt重连成功"); break; } catch (Exception e) { LOGGER_ERROR.error("mqtt连接丢失", e); continue; } finally { try { Thread.sleep(1000); } catch (InterruptedException e) { // 重新设置中断状态 Thread.currentThread().interrupt(); } } } } public void deliveryComplete(IMqttDeliveryToken token) { LOGGER_DEBUG.info("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) { // subscribe后得到的消息会执行到这里面 String msg = new String(message.getPayload(), StandardCharsets.UTF_8); LOGGER_DEBUG.info("收到mqtt消息,主题: " + topic + ", Qos: " + message.getQos() + ", 消息内容: " + msg); //这里可以针对收到的消息做处理 ThreadPoolUtil.getDefaultPool().execute(() -> { try { //调用方法 //dataUploadYwjRecordService.mqttReceived(topic, msg); } catch (Exception e) { LOGGER_ERROR.error("messageArrived", e); } }); } } src/main/java/com/fengdu/gas/service/AdminService.java
@@ -28,11 +28,14 @@ import com.fengdu.gas.repository.po.*; import com.fengdu.gas.repository.vo.AdminRoleVO; import com.fengdu.gas.repository.vo.AdminUserVO; import com.wf.captcha.SpecCaptcha; import org.apache.commons.lang3.BooleanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service @@ -63,7 +66,8 @@ private AdminPermissionService permissionService; @Autowired private AdminPowerService adminPowerService; @Autowired private StringRedisTemplate redisTemplate; // 添加后台管理员 @@ -159,7 +163,22 @@ return ExecutedResult.success(result); } /** * 生成验证码图片并返回其 Base64 编码字符串 * * @param uuid 用于标识验证码的唯一标识符 * @return 包含验证码图片 Base64 编码的响应结果 */ public ExecutedResult<String> generate(String uuid) { SpecCaptcha specCaptcha = new SpecCaptcha(100, 30, 4); // captcha.setCharType(Captcha.TYPE_DEFAULT); String code = specCaptcha.text().toLowerCase(); // 缓存验证码 redisTemplate.opsForValue().set(uuid, code); // 设置验证码3分钟后过期 redisTemplate.expire(uuid, 3, TimeUnit.MINUTES); return ExecutedResult.success(specCaptcha.toBase64()); } // 获取管理员权限列表 public ExecutedResult<ResAdminPower> getPower(Long adminId) { ResAdminPower result = new ResAdminPower(); src/main/resources/application-dev.yml
@@ -66,6 +66,9 @@ keepAliveInterval: 20 # 发送消息的客户端id serverClientId: fengdu-gases-service-publish-dev # quartz定时任务配置 quartz: # 是否启用 src/main/resources/application-prod.yml
@@ -1,12 +1,11 @@ server: port: 8088 port: 8040 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss datasource: driver-class-name: org.postgresql.Driver url: jdbc:postgresql://113.250.189.120:57654/test_db1 url: jdbc:postgresql://113.250.189.120:57654/fengdu_gases_system username: lunhan password: lunhan.20240330 druid: @@ -36,7 +35,27 @@ root: info com.fengdu.gas.host: debug mqtt: # 是否启用 enable: true host: 113.250.189.120 port: 1883 user: admin password: public # 订阅主题 topic: fengdu/# # 订阅消息的客户端id clientId: fengdu-gases-service-dev # 连接超时时间 connectionTimeout: 10 # 心跳 keepAliveInterval: 20 # 发送消息的客户端id serverClientId: fengdu-gases-service-publish-dev # quartz定时任务配置 quartz: # 是否启用 enable: true enable: false src/main/resources/application-test.yml
@@ -1,12 +1,12 @@ server: port: 8088 port: 8040 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss datasource: driver-class-name: org.postgresql.Driver url: jdbc:postgresql://113.250.189.120:57654/test_db1 url: jdbc:postgresql://113.250.189.120:57654/fengdu_gases_system username: lunhan password: lunhan.20240330 druid: @@ -34,10 +34,42 @@ path: ./logs level: root: info com.fengdu.gas.host: DEBUG com.fengdu.gas.service: DEBUG com.fengdu.gas: info file: uploadBasicPath: ./logs/upload/ fileBasicUrl: http://localhost/file disturbStr: com.fengdu.gas # mybatis-plus 配置: #mybatis-plus: # configuration: # ### 开启打印sql配置 # log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mqtt: # 是否启用 enable: false host: 113.250.189.120 port: 1885 user: admin password: public # 订阅主题 topic: fengdu/# # 订阅消息的客户端id clientId: fengdu-gases-service-dev # 连接超时时间 connectionTimeout: 10 # 心跳 keepAliveInterval: 20 # 发送消息的客户端id serverClientId: fengdu-gases-service-publish-dev # quartz定时任务配置 quartz: # 是否启用 enable: true enable: false