elkers
2025-04-07 2dba8dbb14af4616eec876fd9af744651e8beeda
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.nanjing.water.host.mqtt;
 
import com.nanjing.water.common.config.MqttConfig;
import com.nanjing.water.common.enums.ELogger;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
/**
 * 订阅端
 */
@Component
@Order(30)
public class MQTTSubsribe {
    private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName());
    @Autowired
    private MqttConfig mqttConfig;
    @Autowired
    private MQTTConnect mqttConnect;
 
    /**
     * 测试和正式环境不要使用同样的clientId 和主题
     * 如果和正式环境一样,正式环境启动后,本地再次启动会频繁断开重连,订阅的主题一样的话,测试的数据正式环境也会消费这些数据
     */
      //private static final String clientid = "测试clients";//测试
//    private String topic = "lunhan";
 
    public MqttClient client;
 
    //方法实现说明 断线重连方法,如果是持久订阅,重连是不需要再次订阅,如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
    // true为非持久订阅
    public void connect() throws MqttException {
        //防止重复创建MQTTClient实例
        if (client == null) {
            //就是这里的clientId,服务器用来区分用户的,不能重复,clientId不能和发布的clientId一样,否则会出现频繁断开连接和重连的问题
            //不仅不能和发布的clientId一样,而且也不能和其他订阅的clientId一样,如果想要接收之前的离线数据,这就需要将client的 setCleanSession
            // 设置为false,这样服务器才能保留它的session,再次建立连接的时候,它就会继续使用这个session了。 这时此连接clientId 是不能更改的。
            //但是其实还有一个问题,就是使用热部署的时候还是会出现频繁断开连接和重连的问题,可能是因为刚启动时的连接没断开,然后热部署的时候又进行了重连,重启一下就可以了
            //+ System.currentTimeMillis()
            String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort();
            client = new MqttClient(host, mqttConfig.getClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
            //如果是订阅者则添加回调类,发布不需要
            client.setCallback(new PushCallback(MQTTSubsribe.this));
//            client.setCallback(new PushCallback());
        }
        MqttConnectOptions options = mqttConnect.getOptions();
        //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
        if (!client.isConnected()) {
            client.connect(options);
            LOGGER.info("----------mqtt连接成功");
        } else {//这里的逻辑是如果连接成功就重新连接
            client.disconnect();
            client.connect(mqttConnect.getOptions(options));
            LOGGER.info("----------mqtt连接成功");
        }
    }
 
    public void init() {
        try {
            this.connect();
            LOGGER.info("----------mqtt执行");
            this.subscribe(mqttConfig.getTopic());
            LOGGER.info("----------mqtt执行订阅");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic .
     */
    public void subscribe(String topic) {
        subscribe(topic,2);
    }
 
    /**
     * 订阅某个主题
     *
     * @param topic .
     * @param qos .
     */
    public void subscribe(String topic, int qos) {
 
        try {
            client.subscribe(topic,qos);
            //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
//            client.unsubscribe("需要解除订阅关系的主题");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}