perf: 调整mqtt根据装置model动态处理topic

dev
huangfeng 1 month ago
parent 0aa2a7e704
commit e3f7a20c17

@ -28,7 +28,7 @@ public class MqttController extends BaseController {
@ApiOperation("下达命令")
@Log(title = "下达命令", type = "下达")
public ResponseReult<String> send(@RequestBody MessageSend msg) throws Exception {
mqttPublisherService.publish(msg);
mqttPublisherService.publish(msg, "N938XY12345678901");
return ResponseReult.success("OK");
}
}

@ -218,4 +218,11 @@ public class Constants {
public static ConcurrentHashMap<Integer, Integer> scheduleRequestMap = new ConcurrentHashMap<>();
public static HashMap<Integer, String> scheduleRequestDoneMap = new HashMap<>();
/**
* mqtt
*/
public static String TopicPrefix = "/v1/devices/";
public static String DataSuffix = "/datas";
public static String RespSuffix = "/commandResponse";
public static String CmdSuffix = "/command";
}

@ -36,14 +36,14 @@ public class MqttServiceImpl implements MqttService {
paras.setChannelNumber(channel);
paras.setPresetPosition(preset);
paras.setPassword("");
mqttPublisherService.publish(msg);
mqttPublisherService.publish(msg, cmdid);
}
@Override
public void setTermCamera(Map<String, String> ctrlBeanMap, String cmdid) throws Exception {
MessageSend msg = this.buildMessage(ctrlBeanMap);
msg.setDeviceId(cmdid);
mqttPublisherService.publish(msg);
mqttPublisherService.publish(msg, cmdid);
}
private MessageSend buildMessage(Map<String, String> map) {

@ -76,7 +76,7 @@ public class DataMessageListener implements IMqttMessageListener {
}
if (data.getDeviceRequestUploadImageDataEvent() != null) {
MessageUpload upload = photoHandler.allowUpload(data.getDeviceRequestUploadImageDataEvent(), deviceId);
mqttPublisherService.publish(upload);
mqttPublisherService.publish(upload, deviceId);
}
break;
case "WeatherMonitoring":

@ -1,28 +1,30 @@
package com.shxy.xymanager_service.mqtt;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.model.mqtt.MessageSend;
import com.shxy.xymanager_common.model.mqtt.MessageUpload;
import com.shxy.xymanager_common.util.JSONUtil;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import static com.shxy.xymanager_common.constant.Constants.*;
@Service
@Slf4j
public class MqttPublisherService {
@Value("${mqtt.cmdtopic}")
private String cmdtopic;
int mid = 0;
@Resource
private MqttClient mqttClient;
@Resource
TerminalExtService terminalExtService;
private synchronized int getNextMid() {
mid++;
@ -30,23 +32,27 @@ public class MqttPublisherService {
}
@Async
public void publish(MessageSend msg) throws Exception {
public void publish(MessageSend msg, String cmdid) throws Exception {
msg.setMid(this.getNextMid());
String json = JSONUtil.object2Json(msg);
MqttMessage message = new MqttMessage(json.getBytes());
message.setQos(1);
message.setRetained(true);
log.info("mqtt发送消息:" + json);
mqttClient.publish(cmdtopic, message);
this.publishMsg(msg, cmdid);
}
@Async
public void publish(MessageUpload msg) throws Exception {
public void publish(MessageUpload msg, String cmdid) throws Exception {
this.publishMsg(msg, cmdid);
}
private void publishMsg(Object msg, String cmdid) throws Exception {
Terminals term = terminalExtService.getByCmdid(cmdid);
if (term == null) {
log.error("mqtt无法发送消息该装置" + cmdid + "不存在");
return;
}
String json = JSONUtil.object2Json(msg);
MqttMessage message = new MqttMessage(json.getBytes());
message.setQos(1);
message.setRetained(true);
log.info("mqtt发送消息:" + json);
String cmdtopic = TopicPrefix + term.getModel() + CmdSuffix;
log.info("mqtt发送到" + cmdtopic + ", 消息:" + json);
mqttClient.publish(cmdtopic, message);
}
}

@ -1,35 +1,53 @@
package com.shxy.xymanager_service.mqtt;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_service.service.NewCacheService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static com.shxy.xymanager_common.constant.Constants.*;
@Service
@Slf4j
public class MqttSubscriberService {
@Value("${mqtt.resptopic}")
private String resptopic;
@Value("${mqtt.datatopic}")
private String datatopic;
@Resource
private MqttClient mqttClient;
@Resource
DataMessageListener dataMessageListener;
@Resource
RespMessageListener respMessageListener;
@Resource
NewCacheService newCacheService;
@PostConstruct
public void init() throws Exception {
List<String> modelList = new ArrayList<>();
Map<Integer, Terminals> terminalMap = newCacheService.getTerminalMap();
Iterator<Integer> it = terminalMap.keySet().iterator();
while (it.hasNext()) {
Integer termId = it.next();
Terminals term = terminalMap.get(termId);
if (StringUtils.isNotBlank(term.getModel())) {
if (!modelList.contains(term.getModel())) {
modelList.add(term.getModel());
}
}
}
// 订阅主题
mqttClient.subscribe(resptopic, respMessageListener);
mqttClient.subscribe(datatopic, dataMessageListener);
for (String model : modelList) {
mqttClient.subscribe(TopicPrefix + model + RespSuffix, respMessageListener);
mqttClient.subscribe(TopicPrefix + model + DataSuffix, dataMessageListener);
}
}
}

Loading…
Cancel
Save