diff --git a/src/main/java/com/xydl/mapper/OperationDB.java b/src/main/java/com/xydl/mapper/OperationDB.java index f46fbda..cdd0f46 100644 --- a/src/main/java/com/xydl/mapper/OperationDB.java +++ b/src/main/java/com/xydl/mapper/OperationDB.java @@ -1,12 +1,33 @@ package com.xydl.mapper; +import org.apache.ibatis.annotations.Param; + import java.util.List; +import java.util.Map; public interface OperationDB { List getAllDevId(String tableName); - String getLastTime(String tableName, String devId); + String getEarliestTime4Other(String value, int devId); + + String getEarliestTime4Eaif(String value, int devId); + + List getDataEqmids(String value); + + List getSyncRecordDevIds(String tableName); + + List> getFieldMap(String tableName); + + List getAllTable(); + + String getSQL(String tableName); + + Map getDeviceIDAndtime(String tableName); + + void addEarliestTime(int clientId, String tableName, String devId, String earliestTime); + + } diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index a7cddbd..28b90e8 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -6,26 +6,30 @@ import com.xydl.mapper.OperationDB; import com.xydl.util.DataSourceUtils; import com.xydl.util.FormatUtil; import com.xydl.util.MqttUtil; +import com.xydl.util.Subscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; + import java.sql.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.Date; +import java.util.stream.Collectors; + import com.fasterxml.jackson.databind.ObjectMapper; @Service public class MqttServiceImpl { - private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); + private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); @Autowired OperationDB operationDBMapper; - private static final String SYNC_TABLE = "sync_tables_info"; + private static final String SYNC_TABLE = "sync_tables_info"; public List getAllTableNameFromSyncTable() { @@ -39,7 +43,7 @@ public class MqttServiceImpl { pstmt = conn.prepareStatement(sql); rs = pstmt.executeQuery(); - while(rs.next()){ + while (rs.next()) { tableNames.add(rs.getString("table_name")); } } catch (SQLException e) { @@ -62,7 +66,7 @@ public class MqttServiceImpl { pstmt.setString(1, tableName); rs = pstmt.executeQuery(); - if(rs.next()){ + if (rs.next()) { return true; } } catch (SQLException e) { @@ -74,11 +78,11 @@ public class MqttServiceImpl { return false; } - public Map getFieldMap(String tableName) { + public Map getFieldMap(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; - Map fieldsMap = new HashMap<>(); + Map fieldsMap = new HashMap<>(); try { conn = DataSourceUtils.getConnection(); String sql = "select sync_fields_info.field_name, sync_fields_info.dest_field_name " + @@ -89,8 +93,8 @@ public class MqttServiceImpl { rs = pstmt.executeQuery(); ResultSetMetaData metaData = rs.getMetaData(); - while(rs.next()){ - fieldsMap.put(rs.getString("field_name"),rs.getString("dest_field_name")); + while (rs.next()) { + fieldsMap.put(rs.getString("field_name"), rs.getString("dest_field_name")); } } catch (SQLException e) { logger.error("execute sql exception:", e); @@ -98,24 +102,24 @@ public class MqttServiceImpl { DataSourceUtils.closeResource(rs, pstmt, conn); } - return fieldsMap; + return fieldsMap; } - public Map getDeviceIDAndtime(String tableName) { + public Map getDeviceIDAndtime(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; - String sqlExecuting = null ; - Map devIDTimeMap = new HashMap<>(); + String sqlExecuting = null; + Map devIDTimeMap = new HashMap<>(); try { conn = DataSourceUtils.getConnection(); String sql = "select devid_val,field_val2 from sync_records where table_name =?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); - while(rs.next()){ - devIDTimeMap.put(rs.getString("devid_val"),rs.getString("field_val2")); + while (rs.next()) { + devIDTimeMap.put(rs.getString("devid_val"), rs.getString("field_val2")); } } catch (SQLException e) { logger.error("execute sql exception:", e); @@ -123,21 +127,21 @@ public class MqttServiceImpl { DataSourceUtils.closeResource(rs, pstmt, conn); } - return devIDTimeMap; + return devIDTimeMap; } public String getSQL(String tableName) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; - String sqlExecuting = null ; + String sqlExecuting = null; try { conn = DataSourceUtils.getConnection(); String sql = "select * from sync_tables_info where table_name =?"; pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); - if(rs.next()){ + if (rs.next()) { sqlExecuting = rs.getString("sql"); } } catch (SQLException e) { @@ -146,16 +150,15 @@ public class MqttServiceImpl { DataSourceUtils.closeResource(rs, pstmt, conn); } - return sqlExecuting; + return sqlExecuting; } - - public List> getData(String sqlExecuting, String deviceId, String time) { + public List> getData(String sqlExecuting, String deviceId, String time) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; - List> records = new ArrayList<>(); + List> records = new ArrayList<>(); try { conn = DataSourceUtils.getConnection(); String sql = sqlExecuting; @@ -164,12 +167,12 @@ public class MqttServiceImpl { pstmt.setString(2, time); rs = pstmt.executeQuery(); int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量 - while(rs.next()){ - Map record = new HashMap<>(); + while (rs.next()) { + Map record = new HashMap<>(); for (int col = 0; col < columnCount; col++) { String columnName = rs.getMetaData().getColumnName(col + 1); Object columnValue = rs.getString(columnName); - record.put(columnName,columnValue); + record.put(columnName, columnValue); } records.add(record); } @@ -179,7 +182,7 @@ public class MqttServiceImpl { DataSourceUtils.closeResource(rs, pstmt, conn); } - return records; + return records; } public void addEarliestTime2SyncRecord(String tableName, String devId, String lastTime) { @@ -207,7 +210,7 @@ public class MqttServiceImpl { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; - String sqlExecuting = null ; + String sqlExecuting = null; List devIDs = new ArrayList<>(); try { conn = DataSourceUtils.getConnection(); @@ -215,7 +218,7 @@ public class MqttServiceImpl { pstmt = conn.prepareStatement(sql); pstmt.setString(1, tableName); rs = pstmt.executeQuery(); - while(rs.next()){ + while (rs.next()) { devIDs.add(rs.getString("eqmid")); } } catch (SQLException e) { @@ -224,7 +227,7 @@ public class MqttServiceImpl { DataSourceUtils.closeResource(rs, pstmt, conn); } - return devIDs; + return devIDs; } public String getLastTime(String tableName, String devId) { @@ -238,7 +241,7 @@ public class MqttServiceImpl { pstmt.setString(1, tableName); pstmt.setString(2, devId); rs = pstmt.executeQuery(); - if(rs.next()){ + if (rs.next()) { return rs.getString("d_time"); } } catch (SQLException e) { @@ -247,7 +250,54 @@ public class MqttServiceImpl { DataSourceUtils.closeResource(rs, pstmt, conn); } - return null; + return null; + } + + public List getDataEqmids(String tableName) { + Connection conn = null; + PreparedStatement pstmt = null; + ResultSet rs = null; + List eqmids = new ArrayList<>(); + try { + conn = DataSourceUtils.getConnection(); + String sql = "select distinct eqmid from ?"; + pstmt = conn.prepareStatement(sql); + pstmt.setString(1, tableName); + rs = pstmt.executeQuery(); + while (rs.next()) { + eqmids.add(Integer.valueOf(rs.getString("eqmid"))); + } + } catch (SQLException e) { + logger.error("execute sql exception:", e); + } finally { + DataSourceUtils.closeResource(rs, pstmt, conn); + } + + return eqmids; + } + + public List getSyncRecordDevIds(String tableName) { + Connection conn = null; + PreparedStatement pstmt = null; + ResultSet rs = null; + List syncEqmids = new ArrayList<>(); + try { + conn = DataSourceUtils.getConnection(); + String sql = "select devid_val from sync_records where table_name = ?"; + pstmt = conn.prepareStatement(sql); + pstmt.setString(1, tableName); + rs = pstmt.executeQuery(); + while (rs.next()) { + syncEqmids.add(Integer.valueOf(rs.getString("devid_val"))); + } + + } catch (SQLException e) { + logger.error("execute sql exception:", e); + } finally { + DataSourceUtils.closeResource(rs, pstmt, conn); + } + + return syncEqmids; } @@ -270,56 +320,82 @@ public class MqttServiceImpl { } finally { DataSourceUtils.closeResource(rs, pstmt, conn); } - return true; + return true; } - @Scheduled(initialDelay=1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 - public void reportRecord(){ + @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 + public void reportRecord() { logger.info("开始执行"); - List allTableNames = getAllTableNameFromSyncTable(); - for(String tableName : allTableNames){ - - Map fieldMap = getFieldMap(tableName); - String sqlExecuting = getSQL(tableName); - - Map devIDLastTimeMap = getDeviceIDAndtime(tableName); - for(String deviceID : devIDLastTimeMap.keySet()){ - List> dataOfoneDeviceID = getData(sqlExecuting,deviceID, (String) devIDLastTimeMap.get(deviceID)); - String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID,fieldMap); - logger.info("表{}设备{}推送数据:{}",tableName,deviceID,jsonStringData); - if(MqttUtil.publish2MQTT(jsonStringData)){ - updateSyncRecordsTable(tableName,deviceID, (String) devIDLastTimeMap.get(deviceID)); - logger.info("推送成功"); - }else{ - logger.info("消息推送失败"); +// Subscribe.getInstance(); + List allTableNames = operationDBMapper.getAllTable(); + for (String tableName : allTableNames) { + + Map fieldMap = new HashMap<>(); + List> fieldMaps = operationDBMapper.getFieldMap(tableName); + for(Map map: fieldMaps){ + for(String key : map.keySet()){ + fieldMap.put(map.get("field_name"),map.get("dest_field_name")); } } - } - } - - @Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次 - public void subScribeSamle() { - logger.info("开始订阅===subScribe执行一次==={}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - MqttUtil.subScribeMQTT(); - } + String sqlExecuting = operationDBMapper.getSQL(tableName); + List dataEqmids = operationDBMapper.getDataEqmids(tableName); + List syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName); + if (dataEqmids.size() != syncDevIds.size()) { + List distinctDevids = dataEqmids.stream().filter(e -> !syncDevIds.contains(e)).collect(Collectors.toList()); + for (Integer devId : distinctDevids) { + String earliestTime = null; + if ("data_eaif_h".equals(tableName)) { + earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId); + } else { + earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId); + } + operationDBMapper.addEarliestTime(10,tableName, String.valueOf(devId), earliestTime); + } + } - @Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 - public void checkDevIdTimer() { - logger.info("每小时检测一次同步的表是否在‘同步记录表’"); - List allTableNames = getAllTableNameFromSyncTable(); - for(String tableName : allTableNames){ - if(!tableNameIfExitsSyncRec(tableName)){ - logger.info("有不存在的表,把所有的devId及最早的时间更新到'同步记录表'"); - List devIds = operationDBMapper.getAllDevId(tableName); - for(String devId : devIds){ - String lastTime = operationDBMapper.getLastTime(tableName,devId); - addEarliestTime2SyncRecord(tableName,devId,lastTime); + Map devIDLastTimeMap = operationDBMapper.getDeviceIDAndtime(tableName); + for (String deviceID : devIDLastTimeMap.keySet()) { + List> dataOfoneDeviceID = getData(sqlExecuting, deviceID, (String) devIDLastTimeMap.get(deviceID)); + String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); + logger.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData); + if (MqttUtil.publish2MQTT(jsonStringData)) { + updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID)); + logger.info("推送成功"); + } else { + logger.info("消息推送失败"); } } } } - + + +// @Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次 +// public void subScribeSamle() { +// logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); +// MqttUtil.subScribeMQTT(); +// } + +// @Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 +// public void checkDevIdTimer() { +// logger.info("每小时检测一次同步的表是否在‘同步记录表’"); +// List allTableNames = getAllTableNameFromSyncTable(); +// for (String tableName : allTableNames) { +// if (!tableNameIfExitsSyncRec(tableName)) { +// logger.info("有不存在的表{},把所有的devId及最早的时间更新到'同步记录表'", tableName); +// List devIds = operationDBMapper.getAllDevId(tableName); +// for (String devId : devIds) { +// String earliestTime = null; +// if ("data_eaif_h".equals(tableName)) { +// earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId); +// } else { +// earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId); +// } +// addEarliestTime2SyncRecord(tableName, devId, earliestTime); +// } +// } +// } +// } } diff --git a/src/main/java/com/xydl/util/Subscribe.java b/src/main/java/com/xydl/util/Subscribe.java new file mode 100644 index 0000000..3f1c88c --- /dev/null +++ b/src/main/java/com/xydl/util/Subscribe.java @@ -0,0 +1,24 @@ +package com.xydl.util; + +import com.xydl.service.impl.MqttServiceImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.SimpleDateFormat; +import java.util.Date; + +public class Subscribe { + private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); + private static Subscribe single = new Subscribe(); + + private Subscribe(){ + logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); + MqttUtil.subScribeMQTT(); + } + + public static Subscribe getInstance(){ + return single; + } + + +} diff --git a/src/main/resources/com/xydl/mapper/OperationDb.xml b/src/main/resources/com/xydl/mapper/OperationDb.xml index 35e7cfd..ec47c1f 100644 --- a/src/main/resources/com/xydl/mapper/OperationDb.xml +++ b/src/main/resources/com/xydl/mapper/OperationDb.xml @@ -6,9 +6,43 @@ select distinct eqmid from ${tableName} - + select d_time from ${value} where eqmid=#{devId} ORDER BY d_time asc limit 1 + + + + + + + + + + + + + + + + insert into sync_records (client_id,table_name,devid_val,field_val2) values (#{clientId},#{tableName},#{devId},#{earliestTime}) + + + +