|
|
|
package com.xydl.service.impl;
|
|
|
|
|
|
|
|
|
|
|
|
import com.xydl.mapper.OperationDB;
|
|
|
|
import com.xydl.util.FormatUtil;
|
|
|
|
import com.xydl.util.MqttUtil;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import java.util.*;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Service
|
|
|
|
@Slf4j
|
|
|
|
public class MqttServiceImpl {
|
|
|
|
// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
OperationDB operationDBMapper;
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
MqttUtil mqttUtil;
|
|
|
|
|
|
|
|
|
|
|
|
public void reportRecord() {
|
|
|
|
List<String> allTableNames = operationDBMapper.getAllTable();
|
|
|
|
for (String tableName : allTableNames) {
|
|
|
|
|
|
|
|
Map<String,String> fieldMap = new HashMap<>();
|
|
|
|
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
|
|
|
|
for(Map<String,String> map: fieldMaps){
|
|
|
|
for(String key : map.keySet()){
|
|
|
|
fieldMap.put(map.get("field_name"),map.get("dest_field_name"));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
String sqlExecuting = operationDBMapper.getSQL(tableName);
|
|
|
|
String preSQL = sqlExecuting.substring(0,sqlExecuting.indexOf("?"));
|
|
|
|
String midSQL = sqlExecuting.substring(sqlExecuting.indexOf("?")+1,sqlExecuting.lastIndexOf("?"));
|
|
|
|
String lastSQL = sqlExecuting.substring(sqlExecuting.lastIndexOf("?")+1);
|
|
|
|
List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
|
|
|
|
List<Integer> syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName);
|
|
|
|
if (dataEqmids.size() != syncDevIds.size()) {
|
|
|
|
List<Integer> distinctDevids = dataEqmids.stream().filter(e -> !syncDevIds.contains(e)).collect(Collectors.toList());
|
|
|
|
for (Integer devId : distinctDevids) {
|
|
|
|
String earliestTime = null;
|
|
|
|
if ("data_eaif_h".equals(tableName)) {
|
|
|
|
earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId);
|
|
|
|
} else {
|
|
|
|
earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId);
|
|
|
|
}
|
|
|
|
operationDBMapper.addEarliestTime(10,tableName, String.valueOf(devId), earliestTime);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Map<Integer,Object> devIDLastTimeMap = new HashMap<>();
|
|
|
|
List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
|
|
|
|
for(Map<String,Object> map : devIDLastTimeMaps){
|
|
|
|
for(String devId : map.keySet()){
|
|
|
|
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (int deviceID : devIDLastTimeMap.keySet()) {
|
|
|
|
String time = devIDLastTimeMap.get(deviceID).toString();
|
|
|
|
String sql = preSQL+deviceID+midSQL+"'"+time+"'"+lastSQL;
|
|
|
|
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(sql);
|
|
|
|
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
|
|
|
|
log.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData);
|
|
|
|
try{
|
|
|
|
if (mqttUtil.publish2MQTT(jsonStringData)) {
|
|
|
|
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
|
|
|
|
log.info("推送成功");
|
|
|
|
}
|
|
|
|
}catch (Exception e){
|
|
|
|
log.info("推送异常:{}",e.getMessage());
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|