feat: 增加mqtt心跳数据和图片数据的处理

dev
huangfeng 3 months ago
parent 29623f1fc0
commit ff765f4bf4

@ -125,4 +125,5 @@ mqtt:
username: xymp
password: xymp
cmdtopic: /v1/devices/MSRDT-A/command
datatopic: /v1/devices/MSRDT-A/datas
datatopic: /v1/devices/MSRDT-A/datas
photodir: /home/xymp/photos/

@ -9,4 +9,15 @@ public class GeneralDataMessageUpload {
DeviceUploadImageDataEvent deviceUploadImageDataEvent;
HeartbeatMessageUpload heartbeatMessageUpload;
PowerOnContactMessage powerOnContactMessage;
String equipmentIdentity;
String physicalCode;
String equipmentName;
Integer deviceType;
String manufacture;
String supplier;
String equipmentType;
String manufNo;
String manufDate;
Integer communicationMode;
Integer communicationProtocol;
}

@ -0,0 +1,40 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.entity.TerminalStatus;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.model.mqtt.HeartbeatMessageUpload;
import com.shxy.xymanager_common.util.DateUtil;
import com.shxy.xymanager_dao.dao.TerminalStatusDao;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigInteger;
import java.util.Date;
@Service
@Slf4j
public class HeartbeatHandler {
@Resource
TerminalExtService terminalExtService;
@Resource
TerminalStatusDao terminalStatusDao;
public void handleUpload(HeartbeatMessageUpload upload, String deviceId) throws Exception {
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到心跳但是该装置" + deviceId + "不存在");
return;
}
String time = upload.getTime().replace("T", "").replace("Z", "");
log.info("mqtt消息心跳" + deviceId + " " + time);
Date beatTime = DateUtil.parse(time, "yyyyMMddHHmmss");
TerminalStatus record = terminalStatusDao.selectByPrimaryKey(term.getId());
if (record != null) {
record.setLastHeartbeat(BigInteger.valueOf(beatTime.getTime() / 1000));
terminalStatusDao.updateByPrimaryKeySelective(record);
}
}
}

@ -1,23 +1,20 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.model.mqtt.*;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.Base64Utils;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileOutputStream;
@Service
@Slf4j
public class MessageHandler {
@Resource
TerminalExtService terminalExtService;
PhotoHandler photoHandler;
@Resource
HeartbeatHandler heartbeatHandler;
public void process(MessageUpload msg) throws Exception {
if (!CollectionUtils.isEmpty(msg.getDevices())) {
@ -38,39 +35,13 @@ public class MessageHandler {
private void processOneService(OneService service, String deviceId) throws Exception {
if (service.getData() != null) {
GeneralDataMessageUpload data = service.getData();
this.handleHeartbeat(data.getHeartbeatMessageUpload(), deviceId);
this.handleUploadImage(data.getDeviceUploadImageDataEvent(), deviceId);
}
}
private void handleHeartbeat(HeartbeatMessageUpload beat, String deviceId) {
if (beat == null) {
return;
}
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到心跳但是该装置" + deviceId + "不存在");
return;
if (data.getHeartbeatMessageUpload() != null) {
heartbeatHandler.handleUpload(data.getHeartbeatMessageUpload(), deviceId);
}
if (data.getDeviceUploadImageDataEvent() != null) {
photoHandler.handleUpload(data.getDeviceUploadImageDataEvent(), deviceId);
}
}
}
private void handleUploadImage(DeviceUploadImageDataEvent image, String deviceId) throws Exception {
if (image == null) {
return;
}
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到图片但是该装置" + deviceId + "不存在");
return;
}
String folder = "";
File dir = new File(folder);
dir.mkdirs();
String fullPath = folder;
byte[] data = Base64Utils.decodeFromString(image.getData());
try (FileOutputStream fos = new FileOutputStream(fullPath)) {
fos.write(data);
}
}
}

@ -40,8 +40,7 @@ public class MqttSubscriber implements IMqttMessageListener {
MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class);
messageHandler.process(msg);
} catch (Exception ex) {
log.error("mqtt收到消息处理异常.", ex);
log.info("消息内容是" + json);
log.error("mqtt收到消息处理异常, 消息是:" + json, ex);
}
}
}

@ -0,0 +1,90 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.entity.TerminalPhoto;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.model.mqtt.DeviceUploadImageDataEvent;
import com.shxy.xymanager_common.util.DateUtil;
import com.shxy.xymanager_dao.dao.TerminalPhotoDao;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.Base64Utils;
import javax.annotation.Resource;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
@Service
@Slf4j
public class PhotoHandler {
@Value("${mqtt.photodir}")
private String photodir;
@Resource
TerminalExtService terminalExtService;
@Resource
TerminalPhotoDao terminalPhotoDao;
public void handleUpload(DeviceUploadImageDataEvent upload, String deviceId) throws Exception {
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到图片但是该装置" + deviceId + "不存在");
return;
}
int length = deviceId.length();
Date now = new Date();
String folder = DateUtil.format(now, "yyyy/MM/dd") + "/" + deviceId.substring(length - 2);
File dir = new File(photodir + folder);
dir.mkdirs();
String hexC = String.format("%02X", upload.getPresetPosition());
String time = upload.getTime().replace("T", "").replace("Z", "");
Date photoTime = DateUtil.parse(time, "yyyyMMddHHmmss");
String filename = String.join("_",
deviceId, String.valueOf(upload.getChannelNumber()), hexC, time) + ".jpg";
String filePath = folder + "/" + filename;
byte[] data = Base64Utils.decodeFromString(upload.getData());
try (FileOutputStream fos = new FileOutputStream(photodir + filePath)) {
fos.write(data);
}
log.info("mqtt消息保存图片到" + filePath);
TerminalPhoto record = new TerminalPhoto();
record.setTermId(term.getId());
record.setChannelId(upload.getChannelNumber());
record.setPresetId(upload.getPresetPosition());
// record.setOrginalId(BigInteger.valueOf(upload.getPackageNumber()));
record.setPhotoTime(BigInteger.valueOf(photoTime.getTime() / 1000));
record.setMediaType(0);
record.setPath(filePath);
record.setRecvTime(BigInteger.valueOf(now.getTime() / 1000));
record.setRecvEndTime(BigInteger.valueOf(now.getTime() / 1000));
record.setCreateTime(now);
record.setFlags(0);
record.setIsMark(0);
try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) {
BufferedImage image = ImageIO.read(bis);
if (image != null) {
int width = image.getWidth();
int height = image.getHeight();
record.setWidth(width);
record.setHeight(height);
}
} catch (IOException ignore) {
}
long fileSize = Files.size(Paths.get(photodir + filePath));
record.setFileSize((int) fileSize);
terminalPhotoDao.insert(record);
}
}
Loading…
Cancel
Save