You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

107 lines
4.2 KiB
Java

2 years ago
package com.xydl.service.impl;
import com.xydl.mapper.OperationDB;
import com.xydl.util.FormatUtil;
import com.xydl.util.MqttUtil;
2 years ago
import lombok.extern.slf4j.Slf4j;
2 years ago
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
2 years ago
2 years ago
import java.util.*;
2 years ago
import java.util.stream.Collectors;
2 years ago
@Service
2 years ago
@Slf4j
2 years ago
public class MqttServiceImpl {
@Autowired
OperationDB operationDBMapper;
2 years ago
@Autowired
MqttUtil mqttUtil;
2 years ago
public void reportRecord() {
List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) {
2 years ago
send(tableName);
// String sqlExecuting = operationDBMapper.getSQL(tableName);
// List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
// Map<Integer, Object> devIDLastTimeMap = new HashMap<>();
// List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
// for (Map<String, Object> map : devIDLastTimeMaps) {
// for (String devId : map.keySet()) {
// devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
// }
// }
// for (int deviceID : dataEqmids) {
// String time = "";
// if (devIDLastTimeMap.get(deviceID) != null) {
// time = devIDLastTimeMap.get(deviceID).toString();
// } else {
// time = "2000-01-01 01:00:00";
// operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time);
// }
// publishData(deviceID, time, sqlExecuting, tableName, fieldMap);
// }
2 years ago
}
}
2 years ago
2 years ago
//单个表数据发送
public void send(String tableName) {
2 years ago
Map<String, String> fieldMap = new HashMap<>();
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
for (Map<String, String> map : fieldMaps) {
for (String key : map.keySet()) {
fieldMap.put(map.get("field_name"), map.get("dest_field_name"));
}
}
2 years ago
send(tableName, fieldMap);
}
public void send(String tableName, Map<String, String> fieldMap) {
String sqlExecuting = operationDBMapper.getSQL(tableName);
List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
Map<Integer, Object> devIDLastTimeMap = new HashMap<>();
List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
for (Map<String, Object> map : devIDLastTimeMaps) {
for (String devId : map.keySet()) {
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
}
}
for (int deviceID : dataEqmids) {
String time = "";
if (devIDLastTimeMap.get(deviceID) != null) {
time = devIDLastTimeMap.get(deviceID).toString();
} else {
time = "2000-01-01 01:00:00";
operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time);
}
publishData(deviceID, time, sqlExecuting, tableName, fieldMap);
}
2 years ago
}
2 years ago
//推送单个设备数据
public void publishData(int deviceID, String time, String sqlExecuting, String tableName, Map<String, String> fieldMap) {
2 years ago
String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID));
2 years ago
String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'");
2 years ago
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(newSQL);
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
try {
if (mqttUtil.publish2MQTT(jsonStringData)) {
2 years ago
//insert
2 years ago
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, time);
log.info("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData);
2 years ago
}
} catch (Exception e) {
2 years ago
log.info("表{}设备{}推送异常:{}", tableName, deviceID, e.getMessage());
2 years ago
}
}
2 years ago
2 years ago
}