You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
13 KiB
Java

2 years ago
package com.xydl.service.impl;
import com.xydl.mapper.OperationDB;
import com.xydl.util.DataSourceUtils;
import com.xydl.util.FormatUtil;
import com.xydl.util.MqttUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
@Service
public class MqttServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
@Autowired
OperationDB operationDBMapper;
private static final String SYNC_TABLE = "sync_tables_info";
public List<String> getAllTableNameFromSyncTable() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<String> tableNames = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select table_name from sync_tables_info";
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while(rs.next()){
tableNames.add(rs.getString("table_name"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return tableNames;
}
public boolean tableNameIfExitsSyncRec(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "select devid_val from sync_records where table_name = ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
if(rs.next()){
return true;
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return false;
}
public Map<String,String> getFieldMap(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
Map<String,String> fieldsMap = new HashMap<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select sync_fields_info.field_name, sync_fields_info.dest_field_name " +
"from sync_fields_info,sync_tables_info " +
"where sync_fields_info.client_id = 10 and sync_fields_info.table_name = sync_tables_info.table_name and sync_tables_info.table_name=?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while(rs.next()){
fieldsMap.put(rs.getString("field_name"),rs.getString("dest_field_name"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return fieldsMap;
}
public Map<String,Object> getDeviceIDAndtime(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sqlExecuting = null ;
Map<String,Object> devIDTimeMap = new HashMap<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select devid_val,field_val2 from sync_records where table_name =?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while(rs.next()){
devIDTimeMap.put(rs.getString("devid_val"),rs.getString("field_val2"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return devIDTimeMap;
}
public String getSQL(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sqlExecuting = null ;
try {
conn = DataSourceUtils.getConnection();
String sql = "select * from sync_tables_info where table_name =?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
if(rs.next()){
sqlExecuting = rs.getString("sql");
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return sqlExecuting;
}
public List<Map<String,Object>> getData(String sqlExecuting, String deviceId, String timeStamp) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<Map<String,Object>> records = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = sqlExecuting;
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, deviceId);
pstmt.setString(2,timeStamp);
rs = pstmt.executeQuery();
int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量
while(rs.next()){
Map<String,Object> record = new HashMap<>();
for (int col = 0; col < columnCount; col++) {
String columnName = rs.getMetaData().getColumnName(col + 1);
String columnValue = rs.getString(columnName);
record.put(columnName,columnValue);
}
records.add(record);
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return records;
}
public void addEarliestTime2SyncRecord(String tableName, String devId, String lastTime) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "insert into sync_records (client_id,table_name,devid_val,field_val2) values (?,?,?,?)";
pstmt = conn.prepareStatement(sql);
pstmt.setInt(1, 10);
pstmt.setString(2, tableName);
pstmt.setString(3, devId);
pstmt.setString(4, lastTime);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
}
public List<String> getAllDevId(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sqlExecuting = null ;
List<String> devIDs = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select distinct eqmid from ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while(rs.next()){
devIDs.add(rs.getString("eqmid"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return devIDs;
}
public String getLastTime(String tableName, String devId) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "select d_time from ? where eqmid=? ORDER BY d_time asc limit 1";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
pstmt.setString(2, devId);
rs = pstmt.executeQuery();
if(rs.next()){
return rs.getString("d_time");
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return null;
}
private List<Map<String,Object>> transformList(Map<String,String> fieldMap, List<Map<String,Object>> deviceIDData) {
List<Map<String,Object>> newDeviceIDData = new ArrayList<>();
for(Map<String,Object> fieldValueMap : deviceIDData){
newDeviceIDData.add(transformOneRecord(fieldMap,fieldValueMap));
}
return newDeviceIDData;
}
private Map<String,Object> transformOneRecord(Map<String,String> fieldMap, Map<String,Object> fieldValueMap) {
Map<String,Object> newFieldValueMap = new HashMap<>();
for(String field : fieldMap.keySet()){
if(fieldValueMap.containsKey(field)){
newFieldValueMap.put(fieldMap.get(field),fieldValueMap.get(field) );
}
}
return newFieldValueMap;
}
public boolean updateSyncRecordsTable(String tableName, String deviceID, String time) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "update sync_records set field_val2 = ? where table_name = ? and devid_val = ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, time);
pstmt.setString(2, tableName);
pstmt.setString(3, deviceID);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error("execute sql exception:", e);
return false;
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return true;
}
@Scheduled(initialDelay=1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void reportRecord(){
logger.info("开始执行");
List<String> allTableNames = getAllTableNameFromSyncTable();
for(String tableName : allTableNames){
Map<String,String> fieldMap = getFieldMap(tableName);
String sqlExecuting = getSQL(tableName);
Map<String,Object> devIDLastTimeMap = getDeviceIDAndtime(tableName);
for(String deviceID : devIDLastTimeMap.keySet()){
List<Map<String,Object>> dataOfoneDeviceID = getData(sqlExecuting,deviceID, (String) devIDLastTimeMap.get(deviceID));
List<Map<String,Object>> newDataOfoneDeviceID = transformList(fieldMap,dataOfoneDeviceID);
String jsonStringData = FormatUtil.list2Json(newDataOfoneDeviceID);
logger.info("表{}设备{}推送数据:{}",tableName,deviceID,jsonStringData);
if(MqttUtil.publish2MQTT(jsonStringData)){
updateSyncRecordsTable(tableName,deviceID, (String) devIDLastTimeMap.get(deviceID));
logger.info("推送成功");
}else{
logger.info("消息推送失败");
}
}
}
}
@Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次
public void subScribeSamle() {
logger.info("开始订阅===subScribe执行一次==={}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
MqttUtil.subScribeMQTT();
}
@Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void checkDevIdTimer() {
logger.info("每小时检测一次同步的表是否在‘同步记录表’");
List<String> allTableNames = getAllTableNameFromSyncTable();
for(String tableName : allTableNames){
if(!tableNameIfExitsSyncRec(tableName)){
logger.info("有不存在的表,把所有的devId及最早的时间更新到'同步记录表'");
List<String> devIds = operationDBMapper.getAllDevId(tableName);
for(String devId : devIds){
String lastTime = operationDBMapper.getLastTime(tableName,devId);
addEarliestTime2SyncRecord(tableName,devId,lastTime);
}
}
}
}
}