package com.nanjing.water.host.mqtt;
|
|
import com.nanjing.water.common.enums.ELogger;
|
import com.nanjing.water.common.util.LoggerUtil;
|
import com.nanjing.water.common.util.SpringUtil;
|
import com.nanjing.water.common.util.ThreadPoolUtil;;
|
import com.nanjing.water.service.DataUploadRecordService;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.slf4j.Logger;
|
|
import java.nio.charset.StandardCharsets;
|
|
/**
|
* 发布消息的回调类
|
* <p>
|
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
|
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
|
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
|
* 必须在回调类中实现三个方法:
|
* <p>
|
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
|
* <p>
|
* public void connectionLost(Throwable cause)在断开连接时调用。
|
* <p>
|
* public void deliveryComplete(MqttDeliveryToken token))
|
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
|
* 由 MqttClient.connect 激活此回调。
|
*/
|
public class PushCallback implements MqttCallback {
|
private static final Logger LOGGER_DEBUG = LoggerUtil.get(ELogger.DEBUG);
|
private static final Logger LOGGER_ERROR = LoggerUtil.get(ELogger.SYS_ERROR);
|
|
private static DataUploadRecordService dataUploadService = SpringUtil.getBean(DataUploadRecordService.class);
|
|
private MQTTSubsribe mqttSubsribe;
|
|
public PushCallback(MQTTSubsribe mqttSubsribe) {
|
this.mqttSubsribe = mqttSubsribe;
|
}
|
|
public void connectionLost(Throwable cause) {
|
// 连接丢失后,一般在这里面进行重连
|
LOGGER_DEBUG.info("---------------------mqtt连接断开");
|
|
while (true) {
|
try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
|
mqttSubsribe.init();
|
LOGGER_DEBUG.info("---------------------mqtt重连成功");
|
break;
|
} catch (Exception e) {
|
LOGGER_ERROR.error("mqtt连接丢失", e);
|
continue;
|
} finally {
|
try {
|
Thread.sleep(1000);
|
} catch (InterruptedException e) {
|
// 重新设置中断状态
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
}
|
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
LOGGER_DEBUG.info("deliveryComplete---------" + token.isComplete());
|
}
|
|
public void messageArrived(String topic, MqttMessage message) {
|
// subscribe后得到的消息会执行到这里面
|
String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
|
|
LOGGER_DEBUG.info("收到mqtt消息,主题: " + topic + ", Qos: " + message.getQos() + ", 消息内容: " + msg);
|
//这里可以针对收到的消息做处理
|
ThreadPoolUtil.getDefaultPool().execute(() -> {
|
try {
|
//调用方法
|
dataUploadService.mqttReceived(topic, msg);
|
} catch (Exception e) {
|
LOGGER_ERROR.error("messageArrived", e);
|
}
|
});
|
}
|
}
|