package com.xydl.service.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.xydl.mapper.OperationDB; import com.xydl.util.DataSourceUtils; import com.xydl.util.FormatUtil; import com.xydl.util.MqttUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.sql.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.Date; import com.fasterxml.jackson.databind.ObjectMapper; @Service public class MqttServiceImpl { private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); @Autowired OperationDB operationDBMapper; private static final String SYNC_TABLE = "sync_tables_info"; public List getAllTableNameFromSyncTable() { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; List tableNames = new ArrayList<>(); try { conn = DataSourceUtils.getConnection(); String sql = "select table_name from sync_tables_info"; pstmt = conn.prepareStatement(sql); rs = pstmt.executeQuery(); while(rs.next()){ tableNames.add(rs.getString("table_name")); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return tableNames; } public boolean tableNameIfExitsSyncRec(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { conn = DataSourceUtils.getConnection(); String sql = "select devid_val from sync_records where table_name = ?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); if(rs.next()){ return true; } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return false; } public Map getFieldMap(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; Map fieldsMap = new HashMap<>(); try { conn = DataSourceUtils.getConnection(); String sql = "select sync_fields_info.field_name, sync_fields_info.dest_field_name " + "from sync_fields_info,sync_tables_info " + "where sync_fields_info.client_id = 10 and sync_fields_info.table_name = sync_tables_info.table_name and sync_tables_info.table_name=?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); ResultSetMetaData metaData = rs.getMetaData(); while(rs.next()){ fieldsMap.put(rs.getString("field_name"),rs.getString("dest_field_name")); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return fieldsMap; } public Map getDeviceIDAndtime(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; String sqlExecuting = null ; Map devIDTimeMap = new HashMap<>(); try { conn = DataSourceUtils.getConnection(); String sql = "select devid_val,field_val2 from sync_records where table_name =?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); while(rs.next()){ devIDTimeMap.put(rs.getString("devid_val"),rs.getString("field_val2")); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return devIDTimeMap; } public String getSQL(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; String sqlExecuting = null ; try { conn = DataSourceUtils.getConnection(); String sql = "select * from sync_tables_info where table_name =?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); if(rs.next()){ sqlExecuting = rs.getString("sql"); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return sqlExecuting; } public List> getData(String sqlExecuting, String deviceId, String time) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; List> records = new ArrayList<>(); try { conn = DataSourceUtils.getConnection(); String sql = sqlExecuting; pstmt = conn.prepareStatement(sql); pstmt.setString(1, deviceId); pstmt.setString(2, time); rs = pstmt.executeQuery(); int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量 while(rs.next()){ Map record = new HashMap<>(); for (int col = 0; col < columnCount; col++) { String columnName = rs.getMetaData().getColumnName(col + 1); Object columnValue = rs.getString(columnName); record.put(columnName,columnValue); } records.add(record); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return records; } public void addEarliestTime2SyncRecord(String tableName, String devId, String lastTime) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { conn = DataSourceUtils.getConnection(); String sql = "insert into sync_records (client_id,table_name,devid_val,field_val2) values (?,?,?,?)"; pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 10); pstmt.setString(2, tableName); pstmt.setString(3, devId); pstmt.setString(4, lastTime); pstmt.executeUpdate(); } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } } public List getAllDevId(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; String sqlExecuting = null ; List devIDs = new ArrayList<>(); try { conn = DataSourceUtils.getConnection(); String sql = "select distinct eqmid from ?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); while(rs.next()){ devIDs.add(rs.getString("eqmid")); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return devIDs; } public String getLastTime(String tableName, String devId) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { conn = DataSourceUtils.getConnection(); String sql = "select d_time from ? where eqmid=? ORDER BY d_time asc limit 1"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); pstmt.setString(2, devId); rs = pstmt.executeQuery(); if(rs.next()){ return rs.getString("d_time"); } } catch (SQLException e) { logger.error("execute sql exception:", e); } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return null; } public boolean updateSyncRecordsTable(String tableName, String deviceID, String time) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; try { conn = DataSourceUtils.getConnection(); String sql = "update sync_records set field_val2 = ? where table_name = ? and devid_val = ?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, time); pstmt.setString(2, tableName); pstmt.setString(3, deviceID); pstmt.executeUpdate(); } catch (SQLException e) { logger.error("execute sql exception:", e); return false; } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } return true; } @Scheduled(initialDelay=1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 public void reportRecord(){ logger.info("开始执行"); List allTableNames = getAllTableNameFromSyncTable(); for(String tableName : allTableNames){ Map fieldMap = getFieldMap(tableName); String sqlExecuting = getSQL(tableName); Map devIDLastTimeMap = getDeviceIDAndtime(tableName); for(String deviceID : devIDLastTimeMap.keySet()){ List> dataOfoneDeviceID = getData(sqlExecuting,deviceID, (String) devIDLastTimeMap.get(deviceID)); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID,fieldMap); logger.info("表{}设备{}推送数据:{}",tableName,deviceID,jsonStringData); if(MqttUtil.publish2MQTT(jsonStringData)){ updateSyncRecordsTable(tableName,deviceID, (String) devIDLastTimeMap.get(deviceID)); logger.info("推送成功"); }else{ logger.info("消息推送失败"); } } } } @Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次 public void subScribeSamle() { logger.info("开始订阅===subScribe执行一次==={}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); MqttUtil.subScribeMQTT(); } @Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 public void checkDevIdTimer() { logger.info("每小时检测一次同步的表是否在‘同步记录表’"); List allTableNames = getAllTableNameFromSyncTable(); for(String tableName : allTableNames){ if(!tableNameIfExitsSyncRec(tableName)){ logger.info("有不存在的表,把所有的devId及最早的时间更新到'同步记录表'"); List devIds = operationDBMapper.getAllDevId(tableName); for(String devId : devIds){ String lastTime = operationDBMapper.getLastTime(tableName,devId); addEarliestTime2SyncRecord(tableName,devId,lastTime); } } } } }