package com.nanjing.water.host.mqtt;
|
|
import com.nanjing.water.common.config.MqttConfig;
|
import com.nanjing.water.common.enums.ELogger;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.core.annotation.Order;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Objects;
|
|
/**
|
* 发布端
|
* Title:Server
|
* Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
|
*/
|
@Component
|
@Order(7)
|
public class MQTTServer implements InitializingBean {
|
private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName());
|
@Autowired
|
private MqttConfig mqttConfig;
|
@Autowired
|
private MQTTConnect mqttConnect;
|
static MqttClient client;
|
|
private void connect() throws MqttException {
|
//防止重复创建MQTTClient实例
|
if (client == null) {
|
//就是这里的clientId,服务器用来区分用户的,不能重复
|
String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort();
|
client = new MqttClient(host, mqttConfig.getServerClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
|
//client.setCallback(new PushCallback());
|
}
|
MqttConnectOptions options = mqttConnect.getOptions();
|
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
|
if (!client.isConnected()) {
|
client.connect(options);
|
LOGGER.info("---------------------mqtt连接成功");
|
} else {//这里的逻辑是如果连接成功就重新连接
|
client.disconnect();
|
client.connect(mqttConnect.getOptions(options));
|
LOGGER.info("---------------------mqtt连接成功");
|
}
|
}
|
|
|
public boolean publish(MqttTopic topic , MqttMessage message) throws MqttException {
|
MqttDeliveryToken token = topic.publish(message);
|
token.waitForCompletion();
|
LOGGER.debug("消息发送成功! " + token.isComplete());
|
return token.isComplete();
|
}
|
|
/**
|
* mqtt发送消息
|
* @param topic 发布消息的主题
|
* @param data 消息内容
|
*/
|
public boolean send(String topic, String data) throws MqttException {
|
MqttConnectOptions options = mqttConnect.getOptions();
|
try {
|
client.connect(mqttConnect.getOptions(options));
|
} catch (Exception e) {}
|
MqttTopic mqttTopic = client.getTopic(topic);
|
|
MqttMessage message = new MqttMessage();
|
//消息等级
|
//level 0:最多一次的传输
|
//level 1:至少一次的传输,(鸡肋)
|
//level 2: 只有一次的传输
|
message.setQos(0);
|
//如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
|
message.setRetained(false);
|
|
message.setPayload(data.getBytes());
|
|
return this.publish(mqttTopic, message);
|
}
|
|
@Override
|
public void afterPropertiesSet() throws Exception {
|
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
|
// client = new MqttClient(HOST, clientid, new MemoryPersistence());
|
if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) {
|
return;
|
}
|
this.connect();
|
}
|
}
|