feat: 增加发送和接收的通用处理和数据结构

dev
huangfeng 3 months ago
parent a486aa1668
commit 29623f1fc0

@ -0,0 +1,11 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
import java.util.List;
@Data
public class Device {
String deviceId;
List<OneService> services;
}

@ -0,0 +1,18 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class DeviceUploadImageDataEvent {
Integer channelNumber;
Integer presetPosition;
String fileName;
String fileFolder;
Integer checkMode;
String checkValue;
Integer packageNumber;
String time;
Integer encodeMode;
Integer dataLength;
String data;
}

@ -0,0 +1,12 @@
package com.shxy.xymanager_common.model.mqtt;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class GeneralDataMessageUpload {
DeviceUploadImageDataEvent deviceUploadImageDataEvent;
HeartbeatMessageUpload heartbeatMessageUpload;
PowerOnContactMessage powerOnContactMessage;
}

@ -0,0 +1,10 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class HeartbeatMessageUpload {
String time;
Integer signalIntensity;
Integer batteryVoltage;
}

@ -0,0 +1,13 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class MessageSend {
Integer mid;
String serviceId;
String deviceId;
String cmd;
Paras paras;
String msgType;
}

@ -0,0 +1,10 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
import java.util.List;
@Data
public class MessageUpload {
List<Device> devices;
}

@ -0,0 +1,10 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class OneService {
GeneralDataMessageUpload data;
String serviceId;
String eventTime;
}

@ -0,0 +1,10 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class ParamArray {
Integer presetPosition;
Integer hour;
Integer minute;
}

@ -0,0 +1,44 @@
package com.shxy.xymanager_common.model.mqtt;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import java.util.List;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class Paras {
String fileFolder;
String fileName;
String password;
Integer checkMode;
String checkValue;
Integer type;
String url;
String logFileStartTime;
String logFileEndTime;
String logFileName;
String dataType;
Integer amount;
Integer channel;
Integer channelNumber;
Integer timingType;
String time;
Integer level;
Integer precipitationAlarm;
Integer interval;
Integer windSpeedAlarm;
Integer heartbeatInterval;
Integer sampleInterval;
Integer port;
Integer onlineInterval;
Integer httpPort;
Integer sleepDuration;
RestartTime restartTime;
String ipAddr;
String httpIp;
List<ParamArray> paramArray;
Integer groupNumber;
Integer presetPosition;
Integer sign;
}

@ -0,0 +1,8 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class PowerOnContactMessage {
String standardVersionNumber;
}

@ -0,0 +1,10 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class RestartTime {
Integer hour;
Integer day;
Integer minute;
}

@ -0,0 +1,76 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.model.mqtt.*;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.Base64Utils;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileOutputStream;
@Service
@Slf4j
public class MessageHandler {
@Resource
TerminalExtService terminalExtService;
public void process(MessageUpload msg) throws Exception {
if (!CollectionUtils.isEmpty(msg.getDevices())) {
for (Device device : msg.getDevices()) {
this.processDevice(device);
}
}
}
private void processDevice(Device device) throws Exception {
if (!CollectionUtils.isEmpty(device.getServices())) {
for (OneService service : device.getServices()) {
this.processOneService(service, device.getDeviceId());
}
}
}
private void processOneService(OneService service, String deviceId) throws Exception {
if (service.getData() != null) {
GeneralDataMessageUpload data = service.getData();
this.handleHeartbeat(data.getHeartbeatMessageUpload(), deviceId);
this.handleUploadImage(data.getDeviceUploadImageDataEvent(), deviceId);
}
}
private void handleHeartbeat(HeartbeatMessageUpload beat, String deviceId) {
if (beat == null) {
return;
}
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到心跳但是该装置" + deviceId + "不存在");
return;
}
}
private void handleUploadImage(DeviceUploadImageDataEvent image, String deviceId) throws Exception {
if (image == null) {
return;
}
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到图片但是该装置" + deviceId + "不存在");
return;
}
String folder = "";
File dir = new File(folder);
dir.mkdirs();
String fullPath = folder;
byte[] data = Base64Utils.decodeFromString(image.getData());
try (FileOutputStream fos = new FileOutputStream(fullPath)) {
fos.write(data);
}
}
}

@ -1,5 +1,7 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.model.mqtt.MessageSend;
import com.shxy.xymanager_common.util.JSONUtil;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
@ -16,8 +18,9 @@ public class MqttPublisherService {
@Resource
private MqttClient mqttClient;
public void publish(String payload) throws Exception {
MqttMessage message = new MqttMessage(payload.getBytes());
public void publish(MessageSend msg) throws Exception {
String json = JSONUtil.object2Json(msg);
MqttMessage message = new MqttMessage(json.getBytes());
message.setQos(1);
message.setRetained(true);
mqttClient.publish(cmdtopic, message);

@ -1,25 +1,31 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.model.mqtt.MessageUpload;
import com.shxy.xymanager_common.util.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Service
@Slf4j
public class MqttSubscriber implements IMqttMessageListener {
@Autowired
private MqttClient mqttClient;
@Value("${mqtt.datatopic}")
private String datatopic;
@Resource
private MqttClient mqttClient;
@Resource
MessageHandler messageHandler;
@PostConstruct
public void init() throws Exception {
// 订阅默认主题
@ -28,8 +34,15 @@ public class MqttSubscriber implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
log.info("Received message [%s]: %s%n", topic, payload);
String json = new String(message.getPayload());
try {
MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class);
messageHandler.process(msg);
} catch (Exception ex) {
log.error("mqtt收到消息处理异常.", ex);
log.info("消息内容是" + json);
}
}
}

Loading…
Cancel
Save