elkers
2025-04-07 2dba8dbb14af4616eec876fd9af744651e8beeda
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.nanjing.water.host.mqtt;
 
import com.nanjing.water.common.config.MqttConfig;
import com.nanjing.water.common.enums.ELogger;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
import java.util.Objects;
 
/**
 * 发布端
 * Title:Server
 * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
 */
@Component
@Order(7)
public class MQTTServer implements InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName());
    @Autowired
    private MqttConfig mqttConfig;
    @Autowired
    private MQTTConnect mqttConnect;
    static MqttClient client;
 
    private void connect() throws MqttException {
        //防止重复创建MQTTClient实例
        if (client == null) {
            //就是这里的clientId,服务器用来区分用户的,不能重复
            String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort();
            client = new MqttClient(host, mqttConfig.getServerClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
            //client.setCallback(new PushCallback());
        }
        MqttConnectOptions options = mqttConnect.getOptions();
        //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
        if (!client.isConnected()) {
            client.connect(options);
            LOGGER.info("---------------------mqtt连接成功");
        } else {//这里的逻辑是如果连接成功就重新连接
            client.disconnect();
            client.connect(mqttConnect.getOptions(options));
            LOGGER.info("---------------------mqtt连接成功");
        }
    }
 
 
    public boolean publish(MqttTopic topic , MqttMessage message) throws MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        LOGGER.debug("消息发送成功! " + token.isComplete());
        return token.isComplete();
    }
 
    /**
     * mqtt发送消息
     * @param topic 发布消息的主题
     * @param data 消息内容
     */
    public boolean send(String topic, String data) throws MqttException {
        MqttConnectOptions options = mqttConnect.getOptions();
        try {
            client.connect(mqttConnect.getOptions(options));
        } catch (Exception e) {}
        MqttTopic mqttTopic = client.getTopic(topic);
 
        MqttMessage message = new MqttMessage();
        //消息等级
        //level 0:最多一次的传输
        //level 1:至少一次的传输,(鸡肋)
        //level 2: 只有一次的传输
        message.setQos(0);
        //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
        message.setRetained(false);
 
        message.setPayload(data.getBytes());
 
        return this.publish(mqttTopic, message);
    }
 
    @Override
    public void afterPropertiesSet() throws Exception {
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
//        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) {
            return;
        }
        this.connect();
    }
}