From 29623f1fc040e0f0020d24a56fda8d68d8725a42 Mon Sep 17 00:00:00 2001 From: huangfeng Date: Tue, 18 Mar 2025 16:43:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=8F=91=E9=80=81?= =?UTF-8?q?=E5=92=8C=E6=8E=A5=E6=94=B6=E7=9A=84=E9=80=9A=E7=94=A8=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=92=8C=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xymanager_common/model/mqtt/Device.java | 11 +++ .../mqtt/DeviceUploadImageDataEvent.java | 18 +++++ .../model/mqtt/GeneralDataMessageUpload.java | 12 +++ .../model/mqtt/HeartbeatMessageUpload.java | 10 +++ .../model/mqtt/MessageSend.java | 13 ++++ .../model/mqtt/MessageUpload.java | 10 +++ .../model/mqtt/OneService.java | 10 +++ .../model/mqtt/ParamArray.java | 10 +++ .../xymanager_common/model/mqtt/Paras.java | 44 +++++++++++ .../model/mqtt/PowerOnContactMessage.java | 8 ++ .../model/mqtt/RestartTime.java | 10 +++ .../mqtt/MessageHandler.java | 76 +++++++++++++++++++ .../mqtt/MqttPublisherService.java | 7 +- .../mqtt/MqttSubscriber.java | 23 ++++-- 14 files changed, 255 insertions(+), 7 deletions(-) create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Device.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/DeviceUploadImageDataEvent.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/HeartbeatMessageUpload.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageSend.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageUpload.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/ParamArray.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Paras.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PowerOnContactMessage.java create mode 100644 xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/RestartTime.java create mode 100644 xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Device.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Device.java new file mode 100644 index 0000000..87525d7 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Device.java @@ -0,0 +1,11 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +import java.util.List; + +@Data +public class Device { + String deviceId; + List services; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/DeviceUploadImageDataEvent.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/DeviceUploadImageDataEvent.java new file mode 100644 index 0000000..ad1c573 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/DeviceUploadImageDataEvent.java @@ -0,0 +1,18 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class DeviceUploadImageDataEvent { + Integer channelNumber; + Integer presetPosition; + String fileName; + String fileFolder; + Integer checkMode; + String checkValue; + Integer packageNumber; + String time; + Integer encodeMode; + Integer dataLength; + String data; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java new file mode 100644 index 0000000..e8c8301 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java @@ -0,0 +1,12 @@ +package com.shxy.xymanager_common.model.mqtt; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Data; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class GeneralDataMessageUpload { + DeviceUploadImageDataEvent deviceUploadImageDataEvent; + HeartbeatMessageUpload heartbeatMessageUpload; + PowerOnContactMessage powerOnContactMessage; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/HeartbeatMessageUpload.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/HeartbeatMessageUpload.java new file mode 100644 index 0000000..82a7253 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/HeartbeatMessageUpload.java @@ -0,0 +1,10 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class HeartbeatMessageUpload { + String time; + Integer signalIntensity; + Integer batteryVoltage; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageSend.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageSend.java new file mode 100644 index 0000000..ccb623a --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageSend.java @@ -0,0 +1,13 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class MessageSend { + Integer mid; + String serviceId; + String deviceId; + String cmd; + Paras paras; + String msgType; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageUpload.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageUpload.java new file mode 100644 index 0000000..cb7ee8a --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/MessageUpload.java @@ -0,0 +1,10 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +import java.util.List; + +@Data +public class MessageUpload { + List devices; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java new file mode 100644 index 0000000..6c09e80 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java @@ -0,0 +1,10 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class OneService { + GeneralDataMessageUpload data; + String serviceId; + String eventTime; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/ParamArray.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/ParamArray.java new file mode 100644 index 0000000..c3c7524 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/ParamArray.java @@ -0,0 +1,10 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class ParamArray { + Integer presetPosition; + Integer hour; + Integer minute; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Paras.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Paras.java new file mode 100644 index 0000000..2d5de6e --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/Paras.java @@ -0,0 +1,44 @@ +package com.shxy.xymanager_common.model.mqtt; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Data; + +import java.util.List; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class Paras { + String fileFolder; + String fileName; + String password; + Integer checkMode; + String checkValue; + Integer type; + String url; + String logFileStartTime; + String logFileEndTime; + String logFileName; + String dataType; + Integer amount; + Integer channel; + Integer channelNumber; + Integer timingType; + String time; + Integer level; + Integer precipitationAlarm; + Integer interval; + Integer windSpeedAlarm; + Integer heartbeatInterval; + Integer sampleInterval; + Integer port; + Integer onlineInterval; + Integer httpPort; + Integer sleepDuration; + RestartTime restartTime; + String ipAddr; + String httpIp; + List paramArray; + Integer groupNumber; + Integer presetPosition; + Integer sign; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PowerOnContactMessage.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PowerOnContactMessage.java new file mode 100644 index 0000000..59fc75d --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PowerOnContactMessage.java @@ -0,0 +1,8 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class PowerOnContactMessage { + String standardVersionNumber; +} diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/RestartTime.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/RestartTime.java new file mode 100644 index 0000000..84615f2 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/RestartTime.java @@ -0,0 +1,10 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class RestartTime { + Integer hour; + Integer day; + Integer minute; +} diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java new file mode 100644 index 0000000..2416803 --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java @@ -0,0 +1,76 @@ +package com.shxy.xymanager_framework.mqtt; + +import com.shxy.xymanager_common.entity.Terminals; +import com.shxy.xymanager_common.model.mqtt.*; +import com.shxy.xymanager_service.service.TerminalExtService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.Base64Utils; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.io.File; +import java.io.FileOutputStream; + +@Service +@Slf4j +public class MessageHandler { + + @Resource + TerminalExtService terminalExtService; + + public void process(MessageUpload msg) throws Exception { + if (!CollectionUtils.isEmpty(msg.getDevices())) { + for (Device device : msg.getDevices()) { + this.processDevice(device); + } + } + } + + private void processDevice(Device device) throws Exception { + if (!CollectionUtils.isEmpty(device.getServices())) { + for (OneService service : device.getServices()) { + this.processOneService(service, device.getDeviceId()); + } + } + } + + private void processOneService(OneService service, String deviceId) throws Exception { + if (service.getData() != null) { + GeneralDataMessageUpload data = service.getData(); + this.handleHeartbeat(data.getHeartbeatMessageUpload(), deviceId); + this.handleUploadImage(data.getDeviceUploadImageDataEvent(), deviceId); + } + } + + private void handleHeartbeat(HeartbeatMessageUpload beat, String deviceId) { + if (beat == null) { + return; + } + Terminals term = terminalExtService.getByCmdid(deviceId); + if (term == null) { + log.error("mqtt收到心跳,但是该装置" + deviceId + "不存在"); + return; + } + } + + private void handleUploadImage(DeviceUploadImageDataEvent image, String deviceId) throws Exception { + if (image == null) { + return; + } + Terminals term = terminalExtService.getByCmdid(deviceId); + if (term == null) { + log.error("mqtt收到图片,但是该装置" + deviceId + "不存在"); + return; + } + String folder = ""; + File dir = new File(folder); + dir.mkdirs(); + String fullPath = folder; + + byte[] data = Base64Utils.decodeFromString(image.getData()); + try (FileOutputStream fos = new FileOutputStream(fullPath)) { + fos.write(data); + } + } +} diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttPublisherService.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttPublisherService.java index 4016ed1..129288d 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttPublisherService.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttPublisherService.java @@ -1,5 +1,7 @@ package com.shxy.xymanager_framework.mqtt; +import com.shxy.xymanager_common.model.mqtt.MessageSend; +import com.shxy.xymanager_common.util.JSONUtil; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; @@ -16,8 +18,9 @@ public class MqttPublisherService { @Resource private MqttClient mqttClient; - public void publish(String payload) throws Exception { - MqttMessage message = new MqttMessage(payload.getBytes()); + public void publish(MessageSend msg) throws Exception { + String json = JSONUtil.object2Json(msg); + MqttMessage message = new MqttMessage(json.getBytes()); message.setQos(1); message.setRetained(true); mqttClient.publish(cmdtopic, message); diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java index bfe0617..7379e9c 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java @@ -1,25 +1,31 @@ package com.shxy.xymanager_framework.mqtt; +import com.shxy.xymanager_common.model.mqtt.MessageUpload; +import com.shxy.xymanager_common.util.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import javax.annotation.Resource; @Service @Slf4j public class MqttSubscriber implements IMqttMessageListener { - @Autowired - private MqttClient mqttClient; @Value("${mqtt.datatopic}") private String datatopic; + @Resource + private MqttClient mqttClient; + + @Resource + MessageHandler messageHandler; + @PostConstruct public void init() throws Exception { // 订阅默认主题 @@ -28,8 +34,15 @@ public class MqttSubscriber implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - String payload = new String(message.getPayload()); - log.info("Received message [%s]: %s%n", topic, payload); + String json = new String(message.getPayload()); + + try { + MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class); + messageHandler.process(msg); + } catch (Exception ex) { + log.error("mqtt收到消息处理异常.", ex); + log.info("消息内容是" + json); + } } }