feat: 增加拉力和天气的数据结构和处理功能

dev
huangfeng 3 months ago
parent db48dee2c3
commit 7f04bad05d

@ -5,7 +5,7 @@ import lombok.Data;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class GeneralDataMessageUpload {
public class CommonData {
DeviceUploadImageDataEvent deviceUploadImageDataEvent;
HeartbeatMessageUpload heartbeatMessageUpload;
PowerOnContactMessage powerOnContactMessage;
@ -20,4 +20,26 @@ public class GeneralDataMessageUpload {
String manufDate;
Integer communicationMode;
Integer communicationProtocol;
String time;
Float temperature;
Integer humidity;
Float airPressure;
Float instanWindSpeed;
Float aveWindSspeed1;
Float aveWindSspeed10;
Float maxWindSpeed10;
Float aveMaxWindSpeed10;
Float extremeWindSpeed;
Float instanWindDirection;
Float aveWindDirection1;
Integer aveWindDirection10;
Float extremeWindDirection;
Integer radiationIntensity;
Float precipitation1;
Float precipitation6;
Float precipitation12;
Float precipitation24;
Integer type;
Integer number;
PullAndAngleMonitoring dataList;
}

@ -4,7 +4,7 @@ import lombok.Data;
@Data
public class OneService {
GeneralDataMessageUpload data;
CommonData data;
String serviceId;
String eventTime;
}

@ -0,0 +1,16 @@
package com.shxy.xymanager_common.model.mqtt;
import lombok.Data;
@Data
public class PullAndAngleMonitoring {
String time;
Float maxPull;
Float maxPullOblique;
Float maxPullWind;
Float minLineAngle;
Float maxLineAngle;
Float minLineWind;
Float maxLineind;
Float equalIceThickness;
}

@ -1,5 +1,6 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.exception.ApiException;
import com.shxy.xymanager_common.model.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -15,6 +16,10 @@ public class MessageHandler {
PhotoHandler photoHandler;
@Resource
HeartbeatHandler heartbeatHandler;
@Resource
WeatherHandler weatherHandler;
@Resource
PullHandler pullHandler;
public void process(MessageUpload msg) throws Exception {
if (!CollectionUtils.isEmpty(msg.getDevices())) {
@ -33,14 +38,27 @@ public class MessageHandler {
}
private void processOneService(OneService service, String deviceId) throws Exception {
if (service.getData() != null) {
GeneralDataMessageUpload data = service.getData();
if (data.getHeartbeatMessageUpload() != null) {
heartbeatHandler.handleUpload(data.getHeartbeatMessageUpload(), deviceId);
}
if (data.getDeviceUploadImageDataEvent() != null) {
photoHandler.handleUpload(data.getDeviceUploadImageDataEvent(), deviceId);
}
CommonData data = service.getData();
if (data == null) {
throw new ApiException("消息data内容为空,无法处理");
}
switch (service.getServiceId()) {
case "GeneralDataMessageUpload":
if (data.getHeartbeatMessageUpload() != null) {
heartbeatHandler.handleUpload(data.getHeartbeatMessageUpload(), deviceId);
}
if (data.getDeviceUploadImageDataEvent() != null) {
photoHandler.handleUpload(data.getDeviceUploadImageDataEvent(), deviceId);
}
break;
case "WeatherMonitoring":
weatherHandler.handleUpload(data, deviceId);
break;
case "PullAndAngleMonitoring":
pullHandler.handleUpload(data, deviceId);
break;
default:
throw new ApiException(service.getServiceId() + "暂时无法处理");
}
}

@ -1,5 +1,6 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.exception.ApiException;
import com.shxy.xymanager_common.model.mqtt.MessageUpload;
import com.shxy.xymanager_common.util.JSONUtil;
import lombok.extern.slf4j.Slf4j;
@ -35,12 +36,13 @@ public class MqttSubscriber implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String json = new String(message.getPayload());
try {
MessageUpload msg = JSONUtil.json2Object(json, MessageUpload.class);
messageHandler.process(msg);
} catch (ApiException aex) {
log.error("mqtt消息处理异常, " + aex.getMessage() + ", 消息体是:" + json);
} catch (Exception ex) {
log.error("mqtt收到消息处理异常, " + ex.getMessage() + "消息体是:" + json, ex);
log.error("mqtt消息处理异常, " + ex.getMessage() + ", 消息体是:" + json, ex);
}
}
}

@ -44,7 +44,6 @@ public class PhotoHandler {
return;
}
if (StringUtils.isBlank(upload.getData())) {
log.error("mqtt收到图片但是图片data内容是空的");
throw new ApiException("图片data内容是空的");
}
int length = deviceId.length();

@ -0,0 +1,93 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.entity.LeadPulls;
import com.shxy.xymanager_common.entity.LeadPullsExample;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.exception.ApiException;
import com.shxy.xymanager_common.model.mqtt.CommonData;
import com.shxy.xymanager_common.model.mqtt.PullAndAngleMonitoring;
import com.shxy.xymanager_common.util.DateUtil;
import com.shxy.xymanager_dao.dao.LeadPullsMapper;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@Service
@Slf4j
public class PullHandler {
@Resource
TerminalExtService terminalExtService;
@Resource
LeadPullsMapper leadPullsMapper;
public void handleUpload(CommonData data, String deviceId) throws Exception {
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到拉力但是该装置" + deviceId + "不存在");
return;
}
PullAndAngleMonitoring dataList = data.getDataList();
if (dataList == null) {
throw new ApiException("拉力dataList内容是空的");
}
String time = dataList.getTime().replace("T", "").replace("Z", "");
Date dTime = DateUtil.parse(time, "yyyyMMddHHmmss");
long updateTime = dTime.getTime() / 1000;
LeadPulls record = new LeadPulls();
record.setTermId(term.getId());
record.setUpdateTime(updateTime);
record.settSensorNum(String.valueOf(data.getNumber()));
Integer funcCode = null;
switch (data.getType()) {
case 1:
case 6:
funcCode = 17;
break;
case 2:
case 7:
funcCode = 33;
break;
case 3:
funcCode = 49;
break;
case 4:
funcCode = 65;
break;
case 5:
funcCode = 66;
break;
case 8:
funcCode = 81;
break;
case 9:
funcCode = 82;
break;
}
record.setFuncCode(funcCode);
record.setMaxpullPull(dataList.getMaxPull());
record.setMaxpullWind(dataList.getMaxPullWind());
record.setMaxpullTilt(dataList.getMaxPullOblique());
record.setCreateTime(new Date());
LeadPullsExample example = new LeadPullsExample();
LeadPullsExample.Criteria criteria = example.createCriteria();
criteria.andTermIdEqualTo(term.getId());
criteria.andUpdateTimeEqualTo(updateTime);
criteria.andFuncCodeEqualTo(funcCode);
List<LeadPulls> list = leadPullsMapper.selectByExample(example);
if (CollectionUtils.isEmpty(list)) {
leadPullsMapper.insert(record);
} else {
LeadPulls old = list.get(0);
record.setId(old.getId());
leadPullsMapper.updateByPrimaryKey(record);
}
}
}

@ -0,0 +1,68 @@
package com.shxy.xymanager_framework.mqtt;
import com.shxy.xymanager_common.entity.Terminals;
import com.shxy.xymanager_common.entity.Weathers;
import com.shxy.xymanager_common.entity.WeathersExample;
import com.shxy.xymanager_common.model.mqtt.CommonData;
import com.shxy.xymanager_common.util.DateUtil;
import com.shxy.xymanager_dao.dao.WeathersMapper;
import com.shxy.xymanager_service.service.TerminalExtService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@Service
@Slf4j
public class WeatherHandler {
@Resource
TerminalExtService terminalExtService;
@Resource
WeathersMapper weathersMapper;
public void handleUpload(CommonData data, String deviceId) throws Exception {
Terminals term = terminalExtService.getByCmdid(deviceId);
if (term == null) {
log.error("mqtt收到天气但是该装置" + deviceId + "不存在");
return;
}
String time = data.getTime().replace("T", "").replace("Z", "");
Date dTime = DateUtil.parse(time, "yyyyMMddHHmmss");
long updateTime = dTime.getTime() / 1000;
Weathers record = new Weathers();
record.setTermId(term.getId());
record.setUpdateTime(updateTime);
record.setAirTemperature(data.getTemperature());
record.setHumidity(data.getHumidity());
record.setAirPressure(data.getAirPressure());
record.setStandardWindSpeed(data.getInstanWindSpeed());
record.setWindDirection(data.getInstanWindDirection());
record.setExtremeWindSpeed(data.getExtremeWindSpeed());
record.setMaxWindSpeed(data.getMaxWindSpeed10());
record.setAvgWindSpeed1min(data.getAveWindSspeed1());
record.setAvgWindDir1min(data.getAveWindDirection1());
record.setAvgWindSpeed10min(data.getAveWindSspeed10());
record.setAvgWindDir10min(data.getAveWindDirection10());
record.setRadiationIntensity(data.getRadiationIntensity());
record.setPrecipitation(data.getPrecipitation1());
record.setCreateTime(new Date());
WeathersExample example = new WeathersExample();
WeathersExample.Criteria criteria = example.createCriteria();
criteria.andTermIdEqualTo(term.getId());
criteria.andUpdateTimeEqualTo(updateTime);
List<Weathers> list = weathersMapper.selectByExample(example);
if (CollectionUtils.isEmpty(list)) {
weathersMapper.insert(record);
} else {
Weathers old = list.get(0);
record.setId(old.getId());
weathersMapper.updateByPrimaryKey(record);
}
}
}
Loading…
Cancel
Save