feat: 增加订阅response

dev
huangfeng 3 months ago
parent 5c9ede915b
commit f88a5c6c22

@ -125,5 +125,6 @@ mqtt:
username: xymp
password: xymp
cmdtopic: /v1/devices/MSRDT-A/command
resptopic: /v1/devices/MSRDT-A/commandResponse
datatopic: /v1/devices/MSRDT-A/datas
photodir: /home/xymp/photos/

@ -2,7 +2,10 @@ package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.exception.ApiException;
import com.shxy.xymanager_common.model.mqtt.*;
import com.shxy.xymanager_common.util.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -10,7 +13,7 @@ import javax.annotation.Resource;
@Service
@Slf4j
public class MessageHandler {
public class DataMessageListener implements IMqttMessageListener {
@Resource
PhotoHandler photoHandler;
@ -25,6 +28,20 @@ public class MessageHandler {
@Resource
StatusHandler statusHandler;
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String json = new String(message.getPayload());
try {
log.info("mqtt的" + topic + "收到消息:" + json);
MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class);
this.process(msg);
} catch (ApiException aex) {
log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json);
} catch (Exception ex) {
log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex);
}
}
public void process(MessageUpload msg) throws Exception {
if (!CollectionUtils.isEmpty(msg.getDevices())) {
for (Device device : msg.getDevices()) {

@ -1,50 +0,0 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.exception.ApiException;
import com.shxy.xymanager_common.model.mqtt.MessageUpload;
import com.shxy.xymanager_common.util.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Service
@Slf4j
public class MqttSubscriber implements IMqttMessageListener {
@Value("${mqtt.datatopic}")
private String datatopic;
@Resource
private MqttClient mqttClient;
@Resource
MessageHandler messageHandler;
@PostConstruct
public void init() throws Exception {
// 订阅默认主题
mqttClient.subscribe(datatopic, this);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String json = new String(message.getPayload());
try {
log.info("mqtt收到消息:" + json);
MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class);
messageHandler.process(msg);
} catch (ApiException aex) {
log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json);
} catch (Exception ex) {
log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex);
}
}
}

@ -0,0 +1,35 @@
package com.shxy.xymanager_framework.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Service
@Slf4j
public class MqttSubscriberService {
@Value("${mqtt.resptopic}")
private String resptopic;
@Value("${mqtt.datatopic}")
private String datatopic;
@Resource
private MqttClient mqttClient;
@Resource
DataMessageListener dataMessageListener;
@Resource
RespMessageListener respMessageListener;
@PostConstruct
public void init() throws Exception {
// 订阅主题
mqttClient.subscribe(resptopic, respMessageListener);
mqttClient.subscribe(datatopic, dataMessageListener);
}
}

@ -0,0 +1,25 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.exception.ApiException;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class RespMessageListener implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String json = new String(message.getPayload());
try {
log.info("mqtt的" + topic + "收到消息:" + json);
} catch (ApiException aex) {
log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json);
} catch (Exception ex) {
log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex);
}
}
}
Loading…
Cancel
Save