wenhua.zhou 2 years ago
parent 99f7f51be2
commit 65428cd265

@ -1,12 +1,33 @@
package com.xydl.mapper; package com.xydl.mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import java.util.Map;
public interface OperationDB { public interface OperationDB {
List<String> getAllDevId(String tableName); List<String> getAllDevId(String tableName);
String getLastTime(String tableName, String devId); String getEarliestTime4Other(String value, int devId);
String getEarliestTime4Eaif(String value, int devId);
List<Integer> getDataEqmids(String value);
List<Integer> getSyncRecordDevIds(String tableName);
List<Map<String, String>> getFieldMap(String tableName);
List<String> getAllTable();
String getSQL(String tableName);
Map<String, Object> getDeviceIDAndtime(String tableName);
void addEarliestTime(int clientId, String tableName, String devId, String earliestTime);
} }

@ -6,26 +6,30 @@ import com.xydl.mapper.OperationDB;
import com.xydl.util.DataSourceUtils; import com.xydl.util.DataSourceUtils;
import com.xydl.util.FormatUtil; import com.xydl.util.FormatUtil;
import com.xydl.util.MqttUtil; import com.xydl.util.MqttUtil;
import com.xydl.util.Subscribe;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.*; import java.sql.*;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.Date; import java.util.Date;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@Service @Service
public class MqttServiceImpl { public class MqttServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
@Autowired @Autowired
OperationDB operationDBMapper; OperationDB operationDBMapper;
private static final String SYNC_TABLE = "sync_tables_info"; private static final String SYNC_TABLE = "sync_tables_info";
public List<String> getAllTableNameFromSyncTable() { public List<String> getAllTableNameFromSyncTable() {
@ -39,7 +43,7 @@ public class MqttServiceImpl {
pstmt = conn.prepareStatement(sql); pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
while(rs.next()){ while (rs.next()) {
tableNames.add(rs.getString("table_name")); tableNames.add(rs.getString("table_name"));
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -62,7 +66,7 @@ public class MqttServiceImpl {
pstmt.setString(1, tableName); pstmt.setString(1, tableName);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
if(rs.next()){ if (rs.next()) {
return true; return true;
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -74,11 +78,11 @@ public class MqttServiceImpl {
return false; return false;
} }
public Map<String,String> getFieldMap(String tableName) { public Map<String, String> getFieldMap(String tableName) {
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
Map<String,String> fieldsMap = new HashMap<>(); Map<String, String> fieldsMap = new HashMap<>();
try { try {
conn = DataSourceUtils.getConnection(); conn = DataSourceUtils.getConnection();
String sql = "select sync_fields_info.field_name, sync_fields_info.dest_field_name " + String sql = "select sync_fields_info.field_name, sync_fields_info.dest_field_name " +
@ -89,8 +93,8 @@ public class MqttServiceImpl {
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
while(rs.next()){ while (rs.next()) {
fieldsMap.put(rs.getString("field_name"),rs.getString("dest_field_name")); fieldsMap.put(rs.getString("field_name"), rs.getString("dest_field_name"));
} }
} catch (SQLException e) { } catch (SQLException e) {
logger.error("execute sql exception:", e); logger.error("execute sql exception:", e);
@ -98,24 +102,24 @@ public class MqttServiceImpl {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return fieldsMap; return fieldsMap;
} }
public Map<String,Object> getDeviceIDAndtime(String tableName) { public Map<String, Object> getDeviceIDAndtime(String tableName) {
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
String sqlExecuting = null ; String sqlExecuting = null;
Map<String,Object> devIDTimeMap = new HashMap<>(); Map<String, Object> devIDTimeMap = new HashMap<>();
try { try {
conn = DataSourceUtils.getConnection(); conn = DataSourceUtils.getConnection();
String sql = "select devid_val,field_val2 from sync_records where table_name =?"; String sql = "select devid_val,field_val2 from sync_records where table_name =?";
pstmt = conn.prepareStatement(sql); pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName); pstmt.setString(1, tableName);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
while(rs.next()){ while (rs.next()) {
devIDTimeMap.put(rs.getString("devid_val"),rs.getString("field_val2")); devIDTimeMap.put(rs.getString("devid_val"), rs.getString("field_val2"));
} }
} catch (SQLException e) { } catch (SQLException e) {
logger.error("execute sql exception:", e); logger.error("execute sql exception:", e);
@ -123,21 +127,21 @@ public class MqttServiceImpl {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return devIDTimeMap; return devIDTimeMap;
} }
public String getSQL(String tableName) { public String getSQL(String tableName) {
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
String sqlExecuting = null ; String sqlExecuting = null;
try { try {
conn = DataSourceUtils.getConnection(); conn = DataSourceUtils.getConnection();
String sql = "select * from sync_tables_info where table_name =?"; String sql = "select * from sync_tables_info where table_name =?";
pstmt = conn.prepareStatement(sql); pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName); pstmt.setString(1, tableName);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
if(rs.next()){ if (rs.next()) {
sqlExecuting = rs.getString("sql"); sqlExecuting = rs.getString("sql");
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -146,16 +150,15 @@ public class MqttServiceImpl {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return sqlExecuting; return sqlExecuting;
} }
public List<Map<String, Object>> getData(String sqlExecuting, String deviceId, String time) {
public List<Map<String,Object>> getData(String sqlExecuting, String deviceId, String time) {
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
List<Map<String,Object>> records = new ArrayList<>(); List<Map<String, Object>> records = new ArrayList<>();
try { try {
conn = DataSourceUtils.getConnection(); conn = DataSourceUtils.getConnection();
String sql = sqlExecuting; String sql = sqlExecuting;
@ -164,12 +167,12 @@ public class MqttServiceImpl {
pstmt.setString(2, time); pstmt.setString(2, time);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量 int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量
while(rs.next()){ while (rs.next()) {
Map<String,Object> record = new HashMap<>(); Map<String, Object> record = new HashMap<>();
for (int col = 0; col < columnCount; col++) { for (int col = 0; col < columnCount; col++) {
String columnName = rs.getMetaData().getColumnName(col + 1); String columnName = rs.getMetaData().getColumnName(col + 1);
Object columnValue = rs.getString(columnName); Object columnValue = rs.getString(columnName);
record.put(columnName,columnValue); record.put(columnName, columnValue);
} }
records.add(record); records.add(record);
} }
@ -179,7 +182,7 @@ public class MqttServiceImpl {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return records; return records;
} }
public void addEarliestTime2SyncRecord(String tableName, String devId, String lastTime) { public void addEarliestTime2SyncRecord(String tableName, String devId, String lastTime) {
@ -207,7 +210,7 @@ public class MqttServiceImpl {
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
ResultSet rs = null; ResultSet rs = null;
String sqlExecuting = null ; String sqlExecuting = null;
List<String> devIDs = new ArrayList<>(); List<String> devIDs = new ArrayList<>();
try { try {
conn = DataSourceUtils.getConnection(); conn = DataSourceUtils.getConnection();
@ -215,7 +218,7 @@ public class MqttServiceImpl {
pstmt = conn.prepareStatement(sql); pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName); pstmt.setString(1, tableName);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
while(rs.next()){ while (rs.next()) {
devIDs.add(rs.getString("eqmid")); devIDs.add(rs.getString("eqmid"));
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -224,7 +227,7 @@ public class MqttServiceImpl {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return devIDs; return devIDs;
} }
public String getLastTime(String tableName, String devId) { public String getLastTime(String tableName, String devId) {
@ -238,7 +241,7 @@ public class MqttServiceImpl {
pstmt.setString(1, tableName); pstmt.setString(1, tableName);
pstmt.setString(2, devId); pstmt.setString(2, devId);
rs = pstmt.executeQuery(); rs = pstmt.executeQuery();
if(rs.next()){ if (rs.next()) {
return rs.getString("d_time"); return rs.getString("d_time");
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -247,7 +250,54 @@ public class MqttServiceImpl {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return null; return null;
}
public List<Integer> getDataEqmids(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<Integer> eqmids = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select distinct eqmid from ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while (rs.next()) {
eqmids.add(Integer.valueOf(rs.getString("eqmid")));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return eqmids;
}
public List<Integer> getSyncRecordDevIds(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<Integer> syncEqmids = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select devid_val from sync_records where table_name = ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while (rs.next()) {
syncEqmids.add(Integer.valueOf(rs.getString("devid_val")));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return syncEqmids;
} }
@ -270,56 +320,82 @@ public class MqttServiceImpl {
} finally { } finally {
DataSourceUtils.closeResource(rs, pstmt, conn); DataSourceUtils.closeResource(rs, pstmt, conn);
} }
return true; return true;
} }
@Scheduled(initialDelay=1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行 @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void reportRecord(){ public void reportRecord() {
logger.info("开始执行"); logger.info("开始执行");
List<String> allTableNames = getAllTableNameFromSyncTable(); // Subscribe.getInstance();
for(String tableName : allTableNames){ List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) {
Map<String,String> fieldMap = getFieldMap(tableName);
String sqlExecuting = getSQL(tableName); Map<String,String> fieldMap = new HashMap<>();
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
Map<String,Object> devIDLastTimeMap = getDeviceIDAndtime(tableName); for(Map<String,String> map: fieldMaps){
for(String deviceID : devIDLastTimeMap.keySet()){ for(String key : map.keySet()){
List<Map<String,Object>> dataOfoneDeviceID = getData(sqlExecuting,deviceID, (String) devIDLastTimeMap.get(deviceID)); fieldMap.put(map.get("field_name"),map.get("dest_field_name"));
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID,fieldMap);
logger.info("表{}设备{}推送数据:{}",tableName,deviceID,jsonStringData);
if(MqttUtil.publish2MQTT(jsonStringData)){
updateSyncRecordsTable(tableName,deviceID, (String) devIDLastTimeMap.get(deviceID));
logger.info("推送成功");
}else{
logger.info("消息推送失败");
} }
} }
}
}
@Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次 String sqlExecuting = operationDBMapper.getSQL(tableName);
public void subScribeSamle() { List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
logger.info("开始订阅===subScribe执行一次==={}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); List<Integer> syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName);
MqttUtil.subScribeMQTT(); if (dataEqmids.size() != syncDevIds.size()) {
} List<Integer> distinctDevids = dataEqmids.stream().filter(e -> !syncDevIds.contains(e)).collect(Collectors.toList());
for (Integer devId : distinctDevids) {
String earliestTime = null;
if ("data_eaif_h".equals(tableName)) {
earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId);
} else {
earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId);
}
operationDBMapper.addEarliestTime(10,tableName, String.valueOf(devId), earliestTime);
}
}
@Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行 Map<String, Object> devIDLastTimeMap = operationDBMapper.getDeviceIDAndtime(tableName);
public void checkDevIdTimer() { for (String deviceID : devIDLastTimeMap.keySet()) {
logger.info("每小时检测一次同步的表是否在‘同步记录表’"); List<Map<String, Object>> dataOfoneDeviceID = getData(sqlExecuting, deviceID, (String) devIDLastTimeMap.get(deviceID));
List<String> allTableNames = getAllTableNameFromSyncTable(); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
for(String tableName : allTableNames){ logger.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData);
if(!tableNameIfExitsSyncRec(tableName)){ if (MqttUtil.publish2MQTT(jsonStringData)) {
logger.info("有不存在的表,把所有的devId及最早的时间更新到'同步记录表'"); updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
List<String> devIds = operationDBMapper.getAllDevId(tableName); logger.info("推送成功");
for(String devId : devIds){ } else {
String lastTime = operationDBMapper.getLastTime(tableName,devId); logger.info("消息推送失败");
addEarliestTime2SyncRecord(tableName,devId,lastTime);
} }
} }
} }
} }
// @Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次
// public void subScribeSamle() {
// logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
// MqttUtil.subScribeMQTT();
// }
// @Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
// public void checkDevIdTimer() {
// logger.info("每小时检测一次同步的表是否在‘同步记录表’");
// List<String> allTableNames = getAllTableNameFromSyncTable();
// for (String tableName : allTableNames) {
// if (!tableNameIfExitsSyncRec(tableName)) {
// logger.info("有不存在的表{},把所有的devId及最早的时间更新到'同步记录表'", tableName);
// List<String> devIds = operationDBMapper.getAllDevId(tableName);
// for (String devId : devIds) {
// String earliestTime = null;
// if ("data_eaif_h".equals(tableName)) {
// earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId);
// } else {
// earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId);
// }
// addEarliestTime2SyncRecord(tableName, devId, earliestTime);
// }
// }
// }
// }
} }

@ -0,0 +1,24 @@
package com.xydl.util;
import com.xydl.service.impl.MqttServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Subscribe {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
private static Subscribe single = new Subscribe();
private Subscribe(){
logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
MqttUtil.subScribeMQTT();
}
public static Subscribe getInstance(){
return single;
}
}

@ -6,9 +6,43 @@
select distinct eqmid from ${tableName} select distinct eqmid from ${tableName}
</select> </select>
<select id="getLastTime" parameterType="String" resultType="String"> <select id="getEarliestTime4Other" parameterType="String" resultType="String">
select d_time from ${tableName} where eqmid=#{devId} ORDER BY d_time asc limit 1 select d_time from ${value} where eqmid=#{devId} ORDER BY d_time asc limit 1
</select> </select>
<select id="getEarliestTime4Eaif" parameterType="String" resultType="String">
select capturetime from ${value} where eqmid=#{param2} ORDER BY capturetime asc limit 1
</select>
<select id="getDataEqmids" parameterType="String" resultType="Integer">
select distinct eqmid from ${value}
</select>
<select id="getSyncRecordDevIds" parameterType="String" resultType="Integer">
select devid_val from sync_records where table_name = #{tableName}
</select>
<select id="getFieldMap" parameterType="String" resultType="Map">
select field_name, dest_field_name from sync_fields_info where table_name=#{tableName}
</select>
<select id="getAllTable" resultType="String">
select table_name from sync_tables_info
</select>
<select id="getSQL" parameterType="String" resultType="String">
select sql from sync_tables_info where table_name = #{tableName}
</select>
<select id="getDeviceIDAndtime" parameterType="String" resultType="Map">
select devid_val,field_val2 from sync_records where table_name = #{tableName}
</select>
<insert id="addEarliestTime" parameterType="String" >
insert into sync_records (client_id,table_name,devid_val,field_val2) values (#{clientId},#{tableName},#{devId},#{earliestTime})
</insert>
</mapper> </mapper>

Loading…
Cancel
Save