package com.nanjing.water.host.mqtt; import com.nanjing.water.common.config.MqttConfig; import com.nanjing.water.common.enums.ELogger; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * 订阅端 */ @Component @Order(30) public class MQTTSubsribe { private static final Logger LOGGER = LoggerFactory.getLogger(ELogger.DEBUG.getLogFileName()); @Autowired private MqttConfig mqttConfig; @Autowired private MQTTConnect mqttConnect; /** * 测试和正式环境不要使用同样的clientId 和主题 * 如果和正式环境一样,正式环境启动后,本地再次启动会频繁断开重连,订阅的主题一样的话,测试的数据正式环境也会消费这些数据 */ //private static final String clientid = "测试clients";//测试 // private String topic = "lunhan"; public MqttClient client; //方法实现说明 断线重连方法,如果是持久订阅,重连是不需要再次订阅,如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true); // true为非持久订阅 public void connect() throws MqttException { //防止重复创建MQTTClient实例 if (client == null) { //就是这里的clientId,服务器用来区分用户的,不能重复,clientId不能和发布的clientId一样,否则会出现频繁断开连接和重连的问题 //不仅不能和发布的clientId一样,而且也不能和其他订阅的clientId一样,如果想要接收之前的离线数据,这就需要将client的 setCleanSession // 设置为false,这样服务器才能保留它的session,再次建立连接的时候,它就会继续使用这个session了。 这时此连接clientId 是不能更改的。 //但是其实还有一个问题,就是使用热部署的时候还是会出现频繁断开连接和重连的问题,可能是因为刚启动时的连接没断开,然后热部署的时候又进行了重连,重启一下就可以了 //+ System.currentTimeMillis() String host = "tcp://" + mqttConfig.getHost() + ":" + mqttConfig.getPort(); client = new MqttClient(host, mqttConfig.getClientId(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存 //如果是订阅者则添加回调类,发布不需要 client.setCallback(new PushCallback(MQTTSubsribe.this)); // client.setCallback(new PushCallback()); } MqttConnectOptions options = mqttConnect.getOptions(); //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 if (!client.isConnected()) { client.connect(options); LOGGER.info("----------mqtt连接成功"); } else {//这里的逻辑是如果连接成功就重新连接 client.disconnect(); client.connect(mqttConnect.getOptions(options)); LOGGER.info("----------mqtt连接成功"); } } public void init() { try { this.connect(); LOGGER.info("----------mqtt执行"); this.subscribe(mqttConfig.getTopic()); LOGGER.info("----------mqtt执行订阅"); } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅某个主题,qos默认为0 * * @param topic . */ public void subscribe(String topic) { subscribe(topic,2); } /** * 订阅某个主题 * * @param topic . * @param qos . */ public void subscribe(String topic, int qos) { try { client.subscribe(topic,qos); //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。 // client.unsubscribe("需要解除订阅关系的主题"); } catch (MqttException e) { e.printStackTrace(); } } }