diff --git a/xymanager_admin/src/main/resources/application-test.yml b/xymanager_admin/src/main/resources/application-test.yml index a2f15d7..7e1ca1b 100644 --- a/xymanager_admin/src/main/resources/application-test.yml +++ b/xymanager_admin/src/main/resources/application-test.yml @@ -125,4 +125,5 @@ mqtt: username: xymp password: xymp cmdtopic: /v1/devices/MSRDT-A/command - datatopic: /v1/devices/MSRDT-A/datas \ No newline at end of file + datatopic: /v1/devices/MSRDT-A/datas + photodir: /home/xymp/photos/ \ No newline at end of file diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java index e8c8301..820ef15 100644 --- a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java @@ -9,4 +9,15 @@ public class GeneralDataMessageUpload { DeviceUploadImageDataEvent deviceUploadImageDataEvent; HeartbeatMessageUpload heartbeatMessageUpload; PowerOnContactMessage powerOnContactMessage; + String equipmentIdentity; + String physicalCode; + String equipmentName; + Integer deviceType; + String manufacture; + String supplier; + String equipmentType; + String manufNo; + String manufDate; + Integer communicationMode; + Integer communicationProtocol; } diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/HeartbeatHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/HeartbeatHandler.java new file mode 100644 index 0000000..fbf7695 --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/HeartbeatHandler.java @@ -0,0 +1,40 @@ +package com.shxy.xymanager_framework.mqtt; + +import com.shxy.xymanager_common.entity.TerminalStatus; +import com.shxy.xymanager_common.entity.Terminals; +import com.shxy.xymanager_common.model.mqtt.HeartbeatMessageUpload; +import com.shxy.xymanager_common.util.DateUtil; +import com.shxy.xymanager_dao.dao.TerminalStatusDao; +import com.shxy.xymanager_service.service.TerminalExtService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.math.BigInteger; +import java.util.Date; + +@Service +@Slf4j +public class HeartbeatHandler { + + @Resource + TerminalExtService terminalExtService; + @Resource + TerminalStatusDao terminalStatusDao; + + public void handleUpload(HeartbeatMessageUpload upload, String deviceId) throws Exception { + Terminals term = terminalExtService.getByCmdid(deviceId); + if (term == null) { + log.error("mqtt收到心跳,但是该装置" + deviceId + "不存在"); + return; + } + String time = upload.getTime().replace("T", "").replace("Z", ""); + log.info("mqtt消息心跳" + deviceId + " " + time); + Date beatTime = DateUtil.parse(time, "yyyyMMddHHmmss"); + TerminalStatus record = terminalStatusDao.selectByPrimaryKey(term.getId()); + if (record != null) { + record.setLastHeartbeat(BigInteger.valueOf(beatTime.getTime() / 1000)); + terminalStatusDao.updateByPrimaryKeySelective(record); + } + } +} diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java index 2416803..9b6f83c 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java @@ -1,23 +1,20 @@ package com.shxy.xymanager_framework.mqtt; -import com.shxy.xymanager_common.entity.Terminals; import com.shxy.xymanager_common.model.mqtt.*; -import com.shxy.xymanager_service.service.TerminalExtService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import org.springframework.util.Base64Utils; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.io.File; -import java.io.FileOutputStream; @Service @Slf4j public class MessageHandler { @Resource - TerminalExtService terminalExtService; + PhotoHandler photoHandler; + @Resource + HeartbeatHandler heartbeatHandler; public void process(MessageUpload msg) throws Exception { if (!CollectionUtils.isEmpty(msg.getDevices())) { @@ -38,39 +35,13 @@ public class MessageHandler { private void processOneService(OneService service, String deviceId) throws Exception { if (service.getData() != null) { GeneralDataMessageUpload data = service.getData(); - this.handleHeartbeat(data.getHeartbeatMessageUpload(), deviceId); - this.handleUploadImage(data.getDeviceUploadImageDataEvent(), deviceId); - } - } - - private void handleHeartbeat(HeartbeatMessageUpload beat, String deviceId) { - if (beat == null) { - return; - } - Terminals term = terminalExtService.getByCmdid(deviceId); - if (term == null) { - log.error("mqtt收到心跳,但是该装置" + deviceId + "不存在"); - return; + if (data.getHeartbeatMessageUpload() != null) { + heartbeatHandler.handleUpload(data.getHeartbeatMessageUpload(), deviceId); + } + if (data.getDeviceUploadImageDataEvent() != null) { + photoHandler.handleUpload(data.getDeviceUploadImageDataEvent(), deviceId); + } } } - private void handleUploadImage(DeviceUploadImageDataEvent image, String deviceId) throws Exception { - if (image == null) { - return; - } - Terminals term = terminalExtService.getByCmdid(deviceId); - if (term == null) { - log.error("mqtt收到图片,但是该装置" + deviceId + "不存在"); - return; - } - String folder = ""; - File dir = new File(folder); - dir.mkdirs(); - String fullPath = folder; - - byte[] data = Base64Utils.decodeFromString(image.getData()); - try (FileOutputStream fos = new FileOutputStream(fullPath)) { - fos.write(data); - } - } } diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java index 7379e9c..a933a24 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java @@ -40,8 +40,7 @@ public class MqttSubscriber implements IMqttMessageListener { MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class); messageHandler.process(msg); } catch (Exception ex) { - log.error("mqtt收到消息处理异常.", ex); - log.info("消息内容是" + json); + log.error("mqtt收到消息处理异常, 消息是:" + json, ex); } } } diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java new file mode 100644 index 0000000..5121820 --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java @@ -0,0 +1,90 @@ +package com.shxy.xymanager_framework.mqtt; + +import com.shxy.xymanager_common.entity.TerminalPhoto; +import com.shxy.xymanager_common.entity.Terminals; +import com.shxy.xymanager_common.model.mqtt.DeviceUploadImageDataEvent; +import com.shxy.xymanager_common.util.DateUtil; +import com.shxy.xymanager_dao.dao.TerminalPhotoDao; +import com.shxy.xymanager_service.service.TerminalExtService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.Base64Utils; + +import javax.annotation.Resource; +import javax.imageio.ImageIO; +import java.awt.image.BufferedImage; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Date; + +@Service +@Slf4j +public class PhotoHandler { + + @Value("${mqtt.photodir}") + private String photodir; + + @Resource + TerminalExtService terminalExtService; + @Resource + TerminalPhotoDao terminalPhotoDao; + + public void handleUpload(DeviceUploadImageDataEvent upload, String deviceId) throws Exception { + Terminals term = terminalExtService.getByCmdid(deviceId); + if (term == null) { + log.error("mqtt收到图片,但是该装置" + deviceId + "不存在"); + return; + } + int length = deviceId.length(); + Date now = new Date(); + String folder = DateUtil.format(now, "yyyy/MM/dd") + "/" + deviceId.substring(length - 2); + File dir = new File(photodir + folder); + dir.mkdirs(); + String hexC = String.format("%02X", upload.getPresetPosition()); + String time = upload.getTime().replace("T", "").replace("Z", ""); + Date photoTime = DateUtil.parse(time, "yyyyMMddHHmmss"); + String filename = String.join("_", + deviceId, String.valueOf(upload.getChannelNumber()), hexC, time) + ".jpg"; + String filePath = folder + "/" + filename; + + byte[] data = Base64Utils.decodeFromString(upload.getData()); + try (FileOutputStream fos = new FileOutputStream(photodir + filePath)) { + fos.write(data); + } + log.info("mqtt消息保存图片到" + filePath); + + TerminalPhoto record = new TerminalPhoto(); + record.setTermId(term.getId()); + record.setChannelId(upload.getChannelNumber()); + record.setPresetId(upload.getPresetPosition()); +// record.setOrginalId(BigInteger.valueOf(upload.getPackageNumber())); + record.setPhotoTime(BigInteger.valueOf(photoTime.getTime() / 1000)); + record.setMediaType(0); + record.setPath(filePath); + record.setRecvTime(BigInteger.valueOf(now.getTime() / 1000)); + record.setRecvEndTime(BigInteger.valueOf(now.getTime() / 1000)); + record.setCreateTime(now); + record.setFlags(0); + record.setIsMark(0); + try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) { + BufferedImage image = ImageIO.read(bis); + if (image != null) { + int width = image.getWidth(); + int height = image.getHeight(); + record.setWidth(width); + record.setHeight(height); + } + } catch (IOException ignore) { + } + long fileSize = Files.size(Paths.get(photodir + filePath)); + record.setFileSize((int) fileSize); + + terminalPhotoDao.insert(record); + } +}