diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/CommonData.java similarity index 50% rename from xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java rename to xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/CommonData.java index 820ef15..bcc083e 100644 --- a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/GeneralDataMessageUpload.java +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/CommonData.java @@ -5,7 +5,7 @@ import lombok.Data; @JsonInclude(JsonInclude.Include.NON_NULL) @Data -public class GeneralDataMessageUpload { +public class CommonData { DeviceUploadImageDataEvent deviceUploadImageDataEvent; HeartbeatMessageUpload heartbeatMessageUpload; PowerOnContactMessage powerOnContactMessage; @@ -20,4 +20,26 @@ public class GeneralDataMessageUpload { String manufDate; Integer communicationMode; Integer communicationProtocol; + String time; + Float temperature; + Integer humidity; + Float airPressure; + Float instanWindSpeed; + Float aveWindSspeed1; + Float aveWindSspeed10; + Float maxWindSpeed10; + Float aveMaxWindSpeed10; + Float extremeWindSpeed; + Float instanWindDirection; + Float aveWindDirection1; + Integer aveWindDirection10; + Float extremeWindDirection; + Integer radiationIntensity; + Float precipitation1; + Float precipitation6; + Float precipitation12; + Float precipitation24; + Integer type; + Integer number; + PullAndAngleMonitoring dataList; } diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java index 6c09e80..97d4f52 100644 --- a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/OneService.java @@ -4,7 +4,7 @@ import lombok.Data; @Data public class OneService { - GeneralDataMessageUpload data; + CommonData data; String serviceId; String eventTime; } diff --git a/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PullAndAngleMonitoring.java b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PullAndAngleMonitoring.java new file mode 100644 index 0000000..ab78ec6 --- /dev/null +++ b/xymanager_common/src/main/java/com/shxy/xymanager_common/model/mqtt/PullAndAngleMonitoring.java @@ -0,0 +1,16 @@ +package com.shxy.xymanager_common.model.mqtt; + +import lombok.Data; + +@Data +public class PullAndAngleMonitoring { + String time; + Float maxPull; + Float maxPullOblique; + Float maxPullWind; + Float minLineAngle; + Float maxLineAngle; + Float minLineWind; + Float maxLineind; + Float equalIceThickness; +} diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java index 9b6f83c..c5df0de 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MessageHandler.java @@ -1,5 +1,6 @@ package com.shxy.xymanager_framework.mqtt; +import com.shxy.xymanager_common.exception.ApiException; import com.shxy.xymanager_common.model.mqtt.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -15,6 +16,10 @@ public class MessageHandler { PhotoHandler photoHandler; @Resource HeartbeatHandler heartbeatHandler; + @Resource + WeatherHandler weatherHandler; + @Resource + PullHandler pullHandler; public void process(MessageUpload msg) throws Exception { if (!CollectionUtils.isEmpty(msg.getDevices())) { @@ -33,14 +38,27 @@ public class MessageHandler { } private void processOneService(OneService service, String deviceId) throws Exception { - if (service.getData() != null) { - GeneralDataMessageUpload data = service.getData(); - if (data.getHeartbeatMessageUpload() != null) { - heartbeatHandler.handleUpload(data.getHeartbeatMessageUpload(), deviceId); - } - if (data.getDeviceUploadImageDataEvent() != null) { - photoHandler.handleUpload(data.getDeviceUploadImageDataEvent(), deviceId); - } + CommonData data = service.getData(); + if (data == null) { + throw new ApiException("消息data内容为空,无法处理"); + } + switch (service.getServiceId()) { + case "GeneralDataMessageUpload": + if (data.getHeartbeatMessageUpload() != null) { + heartbeatHandler.handleUpload(data.getHeartbeatMessageUpload(), deviceId); + } + if (data.getDeviceUploadImageDataEvent() != null) { + photoHandler.handleUpload(data.getDeviceUploadImageDataEvent(), deviceId); + } + break; + case "WeatherMonitoring": + weatherHandler.handleUpload(data, deviceId); + break; + case "PullAndAngleMonitoring": + pullHandler.handleUpload(data, deviceId); + break; + default: + throw new ApiException(service.getServiceId() + "暂时无法处理"); } } diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java index 2ab40ab..805f860 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/MqttSubscriber.java @@ -1,5 +1,6 @@ package com.shxy.xymanager_framework.mqtt; +import com.shxy.xymanager_common.exception.ApiException; import com.shxy.xymanager_common.model.mqtt.MessageUpload; import com.shxy.xymanager_common.util.JSONUtil; import lombok.extern.slf4j.Slf4j; @@ -35,12 +36,13 @@ public class MqttSubscriber implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String json = new String(message.getPayload()); - try { MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class); messageHandler.process(msg); + } catch (ApiException aex) { + log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json); } catch (Exception ex) { - log.error("mqtt收到消息处理异常, " + ex.getMessage() + "消息体是:" + json, ex); + log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex); } } } diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java index d6ce2d7..39e794d 100644 --- a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PhotoHandler.java @@ -44,7 +44,6 @@ public class PhotoHandler { return; } if (StringUtils.isBlank(upload.getData())) { - log.error("mqtt收到图片,但是图片data内容是空的"); throw new ApiException("图片data内容是空的"); } int length = deviceId.length(); diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PullHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PullHandler.java new file mode 100644 index 0000000..4fa6e98 --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/PullHandler.java @@ -0,0 +1,93 @@ +package com.shxy.xymanager_framework.mqtt; + +import com.shxy.xymanager_common.entity.LeadPulls; +import com.shxy.xymanager_common.entity.LeadPullsExample; +import com.shxy.xymanager_common.entity.Terminals; +import com.shxy.xymanager_common.exception.ApiException; +import com.shxy.xymanager_common.model.mqtt.CommonData; +import com.shxy.xymanager_common.model.mqtt.PullAndAngleMonitoring; +import com.shxy.xymanager_common.util.DateUtil; +import com.shxy.xymanager_dao.dao.LeadPullsMapper; +import com.shxy.xymanager_service.service.TerminalExtService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; + +@Service +@Slf4j +public class PullHandler { + + @Resource + TerminalExtService terminalExtService; + @Resource + LeadPullsMapper leadPullsMapper; + + public void handleUpload(CommonData data, String deviceId) throws Exception { + Terminals term = terminalExtService.getByCmdid(deviceId); + if (term == null) { + log.error("mqtt收到拉力,但是该装置" + deviceId + "不存在"); + return; + } + PullAndAngleMonitoring dataList = data.getDataList(); + if (dataList == null) { + throw new ApiException("拉力dataList内容是空的"); + } + String time = dataList.getTime().replace("T", "").replace("Z", ""); + Date dTime = DateUtil.parse(time, "yyyyMMddHHmmss"); + long updateTime = dTime.getTime() / 1000; + + LeadPulls record = new LeadPulls(); + record.setTermId(term.getId()); + record.setUpdateTime(updateTime); + record.settSensorNum(String.valueOf(data.getNumber())); + Integer funcCode = null; + switch (data.getType()) { + case 1: + case 6: + funcCode = 17; + break; + case 2: + case 7: + funcCode = 33; + break; + case 3: + funcCode = 49; + break; + case 4: + funcCode = 65; + break; + case 5: + funcCode = 66; + break; + case 8: + funcCode = 81; + break; + case 9: + funcCode = 82; + break; + } + record.setFuncCode(funcCode); + record.setMaxpullPull(dataList.getMaxPull()); + record.setMaxpullWind(dataList.getMaxPullWind()); + record.setMaxpullTilt(dataList.getMaxPullOblique()); + record.setCreateTime(new Date()); + + LeadPullsExample example = new LeadPullsExample(); + LeadPullsExample.Criteria criteria = example.createCriteria(); + criteria.andTermIdEqualTo(term.getId()); + criteria.andUpdateTimeEqualTo(updateTime); + criteria.andFuncCodeEqualTo(funcCode); + List list = leadPullsMapper.selectByExample(example); + if (CollectionUtils.isEmpty(list)) { + leadPullsMapper.insert(record); + } else { + LeadPulls old = list.get(0); + record.setId(old.getId()); + leadPullsMapper.updateByPrimaryKey(record); + } + } +} diff --git a/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/WeatherHandler.java b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/WeatherHandler.java new file mode 100644 index 0000000..416d4c9 --- /dev/null +++ b/xymanager_framework/src/main/java/com/shxy/xymanager_framework/mqtt/WeatherHandler.java @@ -0,0 +1,68 @@ +package com.shxy.xymanager_framework.mqtt; + +import com.shxy.xymanager_common.entity.Terminals; +import com.shxy.xymanager_common.entity.Weathers; +import com.shxy.xymanager_common.entity.WeathersExample; +import com.shxy.xymanager_common.model.mqtt.CommonData; +import com.shxy.xymanager_common.util.DateUtil; +import com.shxy.xymanager_dao.dao.WeathersMapper; +import com.shxy.xymanager_service.service.TerminalExtService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; + +@Service +@Slf4j +public class WeatherHandler { + + @Resource + TerminalExtService terminalExtService; + @Resource + WeathersMapper weathersMapper; + + public void handleUpload(CommonData data, String deviceId) throws Exception { + Terminals term = terminalExtService.getByCmdid(deviceId); + if (term == null) { + log.error("mqtt收到天气,但是该装置" + deviceId + "不存在"); + return; + } + String time = data.getTime().replace("T", "").replace("Z", ""); + Date dTime = DateUtil.parse(time, "yyyyMMddHHmmss"); + long updateTime = dTime.getTime() / 1000; + + Weathers record = new Weathers(); + record.setTermId(term.getId()); + record.setUpdateTime(updateTime); + record.setAirTemperature(data.getTemperature()); + record.setHumidity(data.getHumidity()); + record.setAirPressure(data.getAirPressure()); + record.setStandardWindSpeed(data.getInstanWindSpeed()); + record.setWindDirection(data.getInstanWindDirection()); + record.setExtremeWindSpeed(data.getExtremeWindSpeed()); + record.setMaxWindSpeed(data.getMaxWindSpeed10()); + record.setAvgWindSpeed1min(data.getAveWindSspeed1()); + record.setAvgWindDir1min(data.getAveWindDirection1()); + record.setAvgWindSpeed10min(data.getAveWindSspeed10()); + record.setAvgWindDir10min(data.getAveWindDirection10()); + record.setRadiationIntensity(data.getRadiationIntensity()); + record.setPrecipitation(data.getPrecipitation1()); + record.setCreateTime(new Date()); + + WeathersExample example = new WeathersExample(); + WeathersExample.Criteria criteria = example.createCriteria(); + criteria.andTermIdEqualTo(term.getId()); + criteria.andUpdateTimeEqualTo(updateTime); + List list = weathersMapper.selectByExample(example); + if (CollectionUtils.isEmpty(list)) { + weathersMapper.insert(record); + } else { + Weathers old = list.get(0); + record.setId(old.getId()); + weathersMapper.updateByPrimaryKey(record); + } + } +}