package com.lunhan.water.host.mqtt; import com.lunhan.water.common.enums.ELogger; import com.lunhan.water.common.util.LoggerUtil; import com.lunhan.water.common.util.SpringUtil; import com.lunhan.water.common.util.ThreadPoolUtil; import com.lunhan.water.service.PaymentRecordsService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import java.nio.charset.StandardCharsets; import java.util.Objects; ; /** * 发布消息的回调类 *
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: *
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 *
* public void connectionLost(Throwable cause)在断开连接时调用。 *
* public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 */ public class PushCallback implements MqttCallback { private static final Logger LOGGER_DEBUG = LoggerUtil.get(ELogger.DEBUG); private static final Logger LOGGER_ERROR = LoggerUtil.get(ELogger.SYS_ERROR); @Autowired PaymentRecordsService paymentRecordsService; private MQTTSubsribe mqttSubsribe; public PushCallback(MQTTSubsribe mqttSubsribe) { this.mqttSubsribe = mqttSubsribe; } public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 LOGGER_DEBUG.info("---------------------mqtt连接断开"); while (true) { try {//如果没有发生异常说明连接成功,如果发生异常,则死循环 mqttSubsribe.init(); LOGGER_DEBUG.info("---------------------mqtt重连成功"); break; } catch (Exception e) { LOGGER_ERROR.error("mqtt连接丢失", e); continue; } finally { try { Thread.sleep(1000); } catch (InterruptedException e) { // 重新设置中断状态 Thread.currentThread().interrupt(); } } } } public void deliveryComplete(IMqttDeliveryToken token) { LOGGER_DEBUG.info("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) { // subscribe后得到的消息会执行到这里面 String msg = new String(message.getPayload(), StandardCharsets.UTF_8); LOGGER_DEBUG.info("收到mqtt消息,主题: " + topic + ", Qos: " + message.getQos() + ", 消息内容: " + msg); //这里可以针对收到的消息做处理 ThreadPoolUtil.getDefaultPool().execute(() -> { try { if (Objects.nonNull(paymentRecordsService)) { //调用方法 paymentRecordsService.mqttReceived(topic, msg); } else { try { System.out.println("paymentRecordsService bean尚未初始化..."); paymentRecordsService = SpringUtil.getBean(PaymentRecordsService.class); System.out.println("paymentRecordsService 初始化bean成功!"); } catch (Exception ignored) { } if (Objects.nonNull(paymentRecordsService)) { //调用方法 paymentRecordsService.mqttReceived(topic, msg); } } } catch (Exception e) { LOGGER_ERROR.error("messageArrived", e); } }); } }