|
|
|
@ -7,6 +7,7 @@ import com.xydl.util.MqttUtil;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
import java.util.*;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -27,14 +28,13 @@ public class MqttServiceImpl {
|
|
|
|
|
try {
|
|
|
|
|
processOneTable(tableName);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("processOneTable exception:",e);
|
|
|
|
|
log.error("processOneTable exception:", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//单个表数据发送
|
|
|
|
|
public void processOneTable(String tableName) throws Exception{
|
|
|
|
|
log.info("tableName:{}", tableName);
|
|
|
|
|
public void processOneTable(String tableName) throws Exception {
|
|
|
|
|
Map<String, String> fieldMap = new HashMap<>();
|
|
|
|
|
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
|
|
|
|
|
for (Map<String, String> map : fieldMaps) {
|
|
|
|
@ -51,11 +51,13 @@ public class MqttServiceImpl {
|
|
|
|
|
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.info("表{},共{}个设备", tableName, dataEqmids.size());
|
|
|
|
|
|
|
|
|
|
mqttUtil.connect();
|
|
|
|
|
for (int deviceID : dataEqmids) {
|
|
|
|
|
Object time = devIDLastTimeMap.get(deviceID);
|
|
|
|
|
if (time == null) {
|
|
|
|
|
time = "2000-01-01 01:00:00";
|
|
|
|
|
time = "2010-01-01 01:00:00";
|
|
|
|
|
operationDBMapper.addEarliestTime("10", tableName, String.valueOf(deviceID), time.toString());
|
|
|
|
|
}
|
|
|
|
|
publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap);
|
|
|
|
@ -69,21 +71,21 @@ public class MqttServiceImpl {
|
|
|
|
|
String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID));
|
|
|
|
|
String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'");
|
|
|
|
|
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(newSQL);
|
|
|
|
|
if(!dataOfoneDeviceID.isEmpty()){
|
|
|
|
|
if (!dataOfoneDeviceID.isEmpty()) {
|
|
|
|
|
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
|
|
|
|
|
try {
|
|
|
|
|
if (mqttUtil.publish2MQTT(jsonStringData)) {
|
|
|
|
|
String lastRecordTime = "";
|
|
|
|
|
if(dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time") == null){
|
|
|
|
|
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("capturetime").toString();
|
|
|
|
|
}else{
|
|
|
|
|
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time").toString();
|
|
|
|
|
if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) {
|
|
|
|
|
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString();
|
|
|
|
|
} else {
|
|
|
|
|
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time").toString();
|
|
|
|
|
}
|
|
|
|
|
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime);
|
|
|
|
|
log.debug("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData);
|
|
|
|
|
log.debug("表{}设备{}推送成功", tableName, deviceID);
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("表{}设备{}推送异常:", tableName, deviceID, e);
|
|
|
|
|
log.error("表{}设备{}推送异常", tableName, deviceID, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|