From e3f7a20c17b7bafe3cb331eab48a19b35b3399a8 Mon Sep 17 00:00:00 2001 From: huangfeng Date: Fri, 9 May 2025 16:25:44 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E8=B0=83=E6=95=B4mqtt=E6=A0=B9?= =?UTF-8?q?=E6=8D=AE=E8=A3=85=E7=BD=AEmodel=E5=8A=A8=E6=80=81=E5=A4=84?= =?UTF-8?q?=E7=90=86topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/MqttController.java | 2 +- .../xymanager_common/constant/Constants.java | 7 ++++ .../impl/MqttServiceImpl.java | 4 +-- .../mqtt/DataMessageListener.java | 2 +- .../mqtt/MqttPublisherService.java | 34 ++++++++++-------- .../mqtt/MqttSubscriberService.java | 36 ++++++++++++++----- 6 files changed, 58 insertions(+), 27 deletions(-) diff --git a/xymanager_admin/src/main/java/com/shxy/xymanager_admin/controller/MqttController.java b/xymanager_admin/src/main/java/com/shxy/xymanager_admin/controller/MqttController.java index 866e7c7..f5ec1c5 100644 --- a/xymanager_admin/src/main/java/com/shxy/xymanager_admin/controller/MqttController.java +++ b/xymanager_admin/src/main/java/com/shxy/xymanager_admin/controller/MqttController.java @@ -28,7 +28,7 @@ public class MqttController extends BaseController { @ApiOperation("下达命令") @Log(title = "下达命令", type = "下达") public ResponseReult send(@RequestBody MessageSend msg) throws Exception { - mqttPublisherService.publish(msg); + mqttPublisherService.publish(msg, "N938XY12345678901"); return ResponseReult.success("OK"); } } diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/constant/Constants.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/constant/Constants.java index 8f9763f..1ed2eed 100644 --- a/xymanager_common/src/main/java/com/shxy/xymanager_common/constant/Constants.java +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/constant/Constants.java @@ -218,4 +218,11 @@ public class Constants { public static ConcurrentHashMap scheduleRequestMap = new ConcurrentHashMap<>(); public static HashMap scheduleRequestDoneMap = new HashMap<>(); + /** + * mqtt + */ + public static String TopicPrefix = "/v1/devices/"; + public static String DataSuffix = "/datas"; + public static String RespSuffix = "/commandResponse"; + public static String CmdSuffix = "/command"; } \ No newline at end of file diff --git a/xymanager_service/src/main/java/com/shxy/xymanager_service/impl/MqttServiceImpl.java b/xymanager_service/src/main/java/com/shxy/xymanager_service/impl/MqttServiceImpl.java index 0631c02..6477618 100644 --- a/xymanager_service/src/main/java/com/shxy/xymanager_service/impl/MqttServiceImpl.java +++ b/xymanager_service/src/main/java/com/shxy/xymanager_service/impl/MqttServiceImpl.java @@ -36,14 +36,14 @@ public class MqttServiceImpl implements MqttService { paras.setChannelNumber(channel); paras.setPresetPosition(preset); paras.setPassword(""); - mqttPublisherService.publish(msg); + mqttPublisherService.publish(msg, cmdid); } @Override public void setTermCamera(Map ctrlBeanMap, String cmdid) throws Exception { MessageSend msg = this.buildMessage(ctrlBeanMap); msg.setDeviceId(cmdid); - mqttPublisherService.publish(msg); + mqttPublisherService.publish(msg, cmdid); } private MessageSend buildMessage(Map map) { diff --git a/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/DataMessageListener.java b/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/DataMessageListener.java index ebd0bcc..764d4e7 100644 --- a/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/DataMessageListener.java +++ b/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/DataMessageListener.java @@ -76,7 +76,7 @@ public class DataMessageListener implements IMqttMessageListener { } if (data.getDeviceRequestUploadImageDataEvent() != null) { MessageUpload upload = photoHandler.allowUpload(data.getDeviceRequestUploadImageDataEvent(), deviceId); - mqttPublisherService.publish(upload); + mqttPublisherService.publish(upload, deviceId); } break; case "WeatherMonitoring": diff --git a/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttPublisherService.java b/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttPublisherService.java index c950721..b2f25fc 100644 --- a/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttPublisherService.java +++ b/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttPublisherService.java @@ -1,28 +1,30 @@ package com.shxy.xymanager_service.mqtt; +import com.shxy.xymanager_common.entity.Terminals; import com.shxy.xymanager_common.model.mqtt.MessageSend; import com.shxy.xymanager_common.model.mqtt.MessageUpload; import com.shxy.xymanager_common.util.JSONUtil; +import com.shxy.xymanager_service.service.TerminalExtService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import static com.shxy.xymanager_common.constant.Constants.*; + @Service @Slf4j public class MqttPublisherService { - @Value("${mqtt.cmdtopic}") - private String cmdtopic; - int mid = 0; @Resource private MqttClient mqttClient; + @Resource + TerminalExtService terminalExtService; private synchronized int getNextMid() { mid++; @@ -30,23 +32,27 @@ public class MqttPublisherService { } @Async - public void publish(MessageSend msg) throws Exception { + public void publish(MessageSend msg, String cmdid) throws Exception { msg.setMid(this.getNextMid()); - String json = JSONUtil.object2Json(msg); - MqttMessage message = new MqttMessage(json.getBytes()); - message.setQos(1); - message.setRetained(true); - log.info("mqtt发送消息:" + json); - mqttClient.publish(cmdtopic, message); + this.publishMsg(msg, cmdid); } @Async - public void publish(MessageUpload msg) throws Exception { + public void publish(MessageUpload msg, String cmdid) throws Exception { + this.publishMsg(msg, cmdid); + } + + private void publishMsg(Object msg, String cmdid) throws Exception { + Terminals term = terminalExtService.getByCmdid(cmdid); + if (term == null) { + log.error("mqtt无法发送消息,该装置" + cmdid + "不存在"); + return; + } String json = JSONUtil.object2Json(msg); MqttMessage message = new MqttMessage(json.getBytes()); message.setQos(1); - message.setRetained(true); - log.info("mqtt发送消息:" + json); + String cmdtopic = TopicPrefix + term.getModel() + CmdSuffix; + log.info("mqtt发送到" + cmdtopic + ", 消息:" + json); mqttClient.publish(cmdtopic, message); } } diff --git a/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttSubscriberService.java b/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttSubscriberService.java index ac9fcee..96ac06a 100644 --- a/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttSubscriberService.java +++ b/xymanager_service/src/main/java/com/shxy/xymanager_service/mqtt/MqttSubscriberService.java @@ -1,35 +1,53 @@ package com.shxy.xymanager_service.mqtt; +import com.shxy.xymanager_common.entity.Terminals; +import com.shxy.xymanager_service.service.NewCacheService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.MqttClient; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static com.shxy.xymanager_common.constant.Constants.*; @Service @Slf4j public class MqttSubscriberService { - @Value("${mqtt.resptopic}") - private String resptopic; - @Value("${mqtt.datatopic}") - private String datatopic; - @Resource private MqttClient mqttClient; @Resource DataMessageListener dataMessageListener; @Resource RespMessageListener respMessageListener; + @Resource + NewCacheService newCacheService; @PostConstruct public void init() throws Exception { + List modelList = new ArrayList<>(); + Map terminalMap = newCacheService.getTerminalMap(); + Iterator it = terminalMap.keySet().iterator(); + while (it.hasNext()) { + Integer termId = it.next(); + Terminals term = terminalMap.get(termId); + if (StringUtils.isNotBlank(term.getModel())) { + if (!modelList.contains(term.getModel())) { + modelList.add(term.getModel()); + } + } + } // 订阅主题 - mqttClient.subscribe(resptopic, respMessageListener); - mqttClient.subscribe(datatopic, dataMessageListener); + for (String model : modelList) { + mqttClient.subscribe(TopicPrefix + model + RespSuffix, respMessageListener); + mqttClient.subscribe(TopicPrefix + model + DataSuffix, dataMessageListener); + } } - }