package com.nanjing.water.host.mqtt; import com.nanjing.water.common.config.MqttConfig; import com.nanjing.water.common.enums.ELogger; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.Objects; /** * 发布端 * Title:Server * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 */ @Component @Order(7) public class MQTTServer implements InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName()); @Autowired private MqttConfig mqttConfig; @Autowired private MQTTConnect mqttConnect; static MqttClient client; private void connect() throws MqttException { //防止重复创建MQTTClient实例 if (client == null) { //就是这里的clientId,服务器用来区分用户的,不能重复 String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort(); client = new MqttClient(host, mqttConfig.getServerClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存 //client.setCallback(new PushCallback()); } MqttConnectOptions options = mqttConnect.getOptions(); //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 if (!client.isConnected()) { client.connect(options); LOGGER.info("---------------------mqtt连接成功"); } else {//这里的逻辑是如果连接成功就重新连接 client.disconnect(); client.connect(mqttConnect.getOptions(options)); LOGGER.info("---------------------mqtt连接成功"); } } public boolean publish(MqttTopic topic , MqttMessage message) throws MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); LOGGER.debug("消息发送成功! " + token.isComplete()); return token.isComplete(); } /** * mqtt发送消息 * @param topic 发布消息的主题 * @param data 消息内容 */ public boolean send(String topic, String data) throws MqttException { MqttConnectOptions options = mqttConnect.getOptions(); try { client.connect(mqttConnect.getOptions(options)); } catch (Exception e) {} MqttTopic mqttTopic = client.getTopic(topic); MqttMessage message = new MqttMessage(); //消息等级 //level 0:最多一次的传输 //level 1:至少一次的传输,(鸡肋) //level 2: 只有一次的传输 message.setQos(0); //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false message.setRetained(false); message.setPayload(data.getBytes()); return this.publish(mqttTopic, message); } @Override public void afterPropertiesSet() throws Exception { // MemoryPersistence设置clientid的保存形式,默认为以内存保存 // client = new MqttClient(HOST, clientid, new MemoryPersistence()); if (Objects.equals(mqttConfig.getEnable(), Boolean.FALSE)) { return; } this.connect(); } }