elkers
2025-04-07 2dba8dbb14af4616eec876fd9af744651e8beeda
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.nanjing.water.host.mqtt;
 
import com.nanjing.water.common.enums.ELogger;
import com.nanjing.water.common.util.LoggerUtil;
import com.nanjing.water.common.util.SpringUtil;
import com.nanjing.water.common.util.ThreadPoolUtil;;
import com.nanjing.water.service.DataUploadRecordService;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
 
import java.nio.charset.StandardCharsets;
 
/**
 * 发布消息的回调类
 * <p>
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 * <p>
 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 * <p>
 * public void connectionLost(Throwable cause)在断开连接时调用。
 * <p>
 * public void deliveryComplete(MqttDeliveryToken token))
 * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 * 由 MqttClient.connect 激活此回调。
 */
public class PushCallback implements MqttCallback {
    private static final Logger LOGGER_DEBUG = LoggerUtil.get(ELogger.DEBUG);
    private static final Logger LOGGER_ERROR = LoggerUtil.get(ELogger.SYS_ERROR);
 
    private static DataUploadRecordService dataUploadService = SpringUtil.getBean(DataUploadRecordService.class);
 
    private MQTTSubsribe mqttSubsribe;
 
    public PushCallback(MQTTSubsribe mqttSubsribe) {
        this.mqttSubsribe = mqttSubsribe;
    }
 
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        LOGGER_DEBUG.info("---------------------mqtt连接断开");
 
        while (true) {
            try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
                mqttSubsribe.init();
                LOGGER_DEBUG.info("---------------------mqtt重连成功");
                break;
            } catch (Exception e) {
                LOGGER_ERROR.error("mqtt连接丢失", e);
                continue;
            } finally {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // 重新设置中断状态
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
 
    public void deliveryComplete(IMqttDeliveryToken token) {
        LOGGER_DEBUG.info("deliveryComplete---------" + token.isComplete());
    }
 
    public void messageArrived(String topic, MqttMessage message) {
        // subscribe后得到的消息会执行到这里面
        String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
 
        LOGGER_DEBUG.info("收到mqtt消息,主题: " + topic + ", Qos: " + message.getQos() + ", 消息内容: " + msg);
        //这里可以针对收到的消息做处理
        ThreadPoolUtil.getDefaultPool().execute(() -> {
            try {
                //调用方法
                dataUploadService.mqttReceived(topic, msg);
            } catch (Exception e) {
                LOGGER_ERROR.error("messageArrived", e);
            }
        });
    }
}