diff --git a/xymanager_admin/src/main/resources/application-test.yml b/xymanager_admin/src/main/resources/application-test.yml index 7e1ca1b..fb8689c 100644 --- a/xymanager_admin/src/main/resources/application-test.yml +++ b/xymanager_admin/src/main/resources/application-test.yml @@ -125,5 +125,6 @@ mqtt: username: xymp password: xymp cmdtopic: /v1/devices/MSRDT-A/command + resptopic: /v1/devices/MSRDT-A/commandResponse datatopic: /v1/devices/MSRDT-A/datas photodir: /home/xymp/photos/ \ No newline at end of file diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/DataMessageListener.java similarity index 75% rename from xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java rename to xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/DataMessageListener.java index aa8bd27..009cdd1 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/DataMessageListener.java @@ -2,7 +2,10 @@ package com.shxy.xymanager_framework.mqtt; import com.shxy.xymanager_common.exception.ApiException; import com.shxy.xymanager_common.model.mqtt.*; +import com.shxy.xymanager_common.util.JSONUtil; import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -10,7 +13,7 @@ import javax.annotation.Resource; @Service @Slf4j -public class MessageHandler { +public class DataMessageListener implements IMqttMessageListener { @Resource PhotoHandler photoHandler; @@ -25,6 +28,20 @@ public class MessageHandler { @Resource StatusHandler statusHandler; + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String json = new String(message.getPayload()); + try { + log.info("mqtt的" + topic + "收到消息:" + json); + MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class); + this.process(msg); + } catch (ApiException aex) { + log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json); + } catch (Exception ex) { + log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex); + } + } + public void process(MessageUpload msg) throws Exception { if (!CollectionUtils.isEmpty(msg.getDevices())) { for (Device device : msg.getDevices()) { diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java deleted file mode 100644 index 0b70f87..0000000 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.shxy.xymanager_framework.mqtt; - -import com.shxy.xymanager_common.exception.ApiException; -import com.shxy.xymanager_common.model.mqtt.MessageUpload; -import com.shxy.xymanager_common.util.JSONUtil; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttMessageListener; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; - -@Service -@Slf4j -public class MqttSubscriber implements IMqttMessageListener { - - - @Value("${mqtt.datatopic}") - private String datatopic; - - @Resource - private MqttClient mqttClient; - - @Resource - MessageHandler messageHandler; - - @PostConstruct - public void init() throws Exception { - // 订阅默认主题 - mqttClient.subscribe(datatopic, this); - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - String json = new String(message.getPayload()); - try { - log.info("mqtt收到消息:" + json); - MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class); - messageHandler.process(msg); - } catch (ApiException aex) { - log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json); - } catch (Exception ex) { - log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex); - } - } -} - diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriberService.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriberService.java new file mode 100644 index 0000000..7d6f43c --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriberService.java @@ -0,0 +1,35 @@ +package com.shxy.xymanager_framework.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +@Service +@Slf4j +public class MqttSubscriberService { + + @Value("${mqtt.resptopic}") + private String resptopic; + @Value("${mqtt.datatopic}") + private String datatopic; + + @Resource + private MqttClient mqttClient; + @Resource + DataMessageListener dataMessageListener; + @Resource + RespMessageListener respMessageListener; + + @PostConstruct + public void init() throws Exception { + // 订阅主题 + mqttClient.subscribe(resptopic, respMessageListener); + mqttClient.subscribe(datatopic, dataMessageListener); + } + +} + diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/RespMessageListener.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/RespMessageListener.java new file mode 100644 index 0000000..7451bb8 --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/RespMessageListener.java @@ -0,0 +1,25 @@ +package com.shxy.xymanager_framework.mqtt; + +import com.shxy.xymanager_common.exception.ApiException; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class RespMessageListener implements IMqttMessageListener { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String json = new String(message.getPayload()); + try { + log.info("mqtt的" + topic + "收到消息:" + json); + } catch (ApiException aex) { + log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json); + } catch (Exception ex) { + log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex); + } + } + +}