package com.nanjing.water.host.mqtt; import com.nanjing.water.common.enums.ELogger; import com.nanjing.water.common.util.LoggerUtil; import com.nanjing.water.common.util.SpringUtil; import com.nanjing.water.common.util.ThreadPoolUtil;; import com.nanjing.water.service.DataUploadRecordService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import java.nio.charset.StandardCharsets; /** * 发布消息的回调类 *

* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: *

* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 *

* public void connectionLost(Throwable cause)在断开连接时调用。 *

* public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 */ public class PushCallback implements MqttCallback { private static final Logger LOGGER_DEBUG = LoggerUtil.get(ELogger.DEBUG); private static final Logger LOGGER_ERROR = LoggerUtil.get(ELogger.SYS_ERROR); private static DataUploadRecordService dataUploadService = SpringUtil.getBean(DataUploadRecordService.class); private MQTTSubsribe mqttSubsribe; public PushCallback(MQTTSubsribe mqttSubsribe) { this.mqttSubsribe = mqttSubsribe; } public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 LOGGER_DEBUG.info("---------------------mqtt连接断开"); while (true) { try {//如果没有发生异常说明连接成功,如果发生异常,则死循环 mqttSubsribe.init(); LOGGER_DEBUG.info("---------------------mqtt重连成功"); break; } catch (Exception e) { LOGGER_ERROR.error("mqtt连接丢失", e); continue; } finally { try { Thread.sleep(1000); } catch (InterruptedException e) { // 重新设置中断状态 Thread.currentThread().interrupt(); } } } } public void deliveryComplete(IMqttDeliveryToken token) { LOGGER_DEBUG.info("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) { // subscribe后得到的消息会执行到这里面 String msg = new String(message.getPayload(), StandardCharsets.UTF_8); LOGGER_DEBUG.info("收到mqtt消息,主题: " + topic + ", Qos: " + message.getQos() + ", 消息内容: " + msg); //这里可以针对收到的消息做处理 ThreadPoolUtil.getDefaultPool().execute(() -> { try { //调用方法 dataUploadService.mqttReceived(topic, msg); } catch (Exception e) { LOGGER_ERROR.error("messageArrived", e); } }); } }