@ -146,7 +146,7 @@ public class EiaServiceImpl implements EiaService {
List < String > tableNames = new ArrayList < > ( ) ;
List < String > tableNames = new ArrayList < > ( ) ;
try {
try {
conn = DataSourceUtils . getConnection ( ) ;
conn = DataSourceUtils . getConnection ( ) ;
String sql = "select DISTINCT table_name from sync_tables_info";
String sql = "select table_name from sync_tables_info";
pstmt = conn . prepareStatement ( sql ) ;
pstmt = conn . prepareStatement ( sql ) ;
rs = pstmt . executeQuery ( ) ;
rs = pstmt . executeQuery ( ) ;
@ -189,45 +189,69 @@ public class EiaServiceImpl implements EiaService {
return fieldsMap ;
return fieldsMap ;
}
}
public String getLastRecordTimeSended ( String tableName , String deviceId ) {
// public String getLastRecordTimeSended(String tableName,String deviceId) {
Connection conn = null ;
// Connection conn = null;
PreparedStatement pstmt = null ;
// PreparedStatement pstmt = null;
ResultSet rs = null ;
// ResultSet rs = null;
Timestamp timeStamp = null ;
// Timestamp timeStamp = null;
try {
// try {
conn = DataSourceUtils . getConnection ( ) ;
// conn = DataSourceUtils.getConnection();
String sql = "select field_val2 from sync_records where table_name =? and devid_val=?" ;
// String sql = "select field_val2 from sync_records where table_name =? and devid_val=?";
pstmt = conn . prepareStatement ( sql ) ;
// pstmt = conn.prepareStatement(sql);
pstmt . setString ( 1 , tableName ) ;
// pstmt.setString(1, tableName);
pstmt . setString ( 2 , deviceId ) ;
// pstmt.setString(2, deviceId);
rs = pstmt . executeQuery ( ) ;
// rs = pstmt.executeQuery();
if ( rs . next ( ) ) {
// if(rs.next()){
timeStamp = rs . getTimestamp ( "field_val2" ) ;
// timeStamp = rs.getTimestamp("field_val2");
//
}
// }
} catch ( SQLException e ) {
// } catch (SQLException e) {
logger . error ( "execute sql exception:" , e ) ;
// logger.error("execute sql exception:", e);
} finally {
// } finally {
DataSourceUtils . closeResource ( rs , pstmt , conn ) ;
// DataSourceUtils.closeResource(rs, pstmt, conn);
}
// }
//
return new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) . format ( timeStamp ) ;
// return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timeStamp);
}
// }
// public List<String> getDeviceID(String tableName) {
// Connection conn = null;
// PreparedStatement pstmt = null;
// ResultSet rs = null;
// String sqlExecuting = null ;
// List<String> deviceIDs = new ArrayList<>();
// try {
// conn = DataSourceUtils.getConnection();
// String sql = "select distinct devid_val from sync_records where table_name =?";
// pstmt = conn.prepareStatement(sql);
// pstmt.setString(1, tableName);
// rs = pstmt.executeQuery();
// while(rs.next()){
// deviceIDs.add(rs.getString("devid_val"));
// }
// } catch (SQLException e) {
// logger.error("execute sql exception:", e);
// } finally {
// DataSourceUtils.closeResource(rs, pstmt, conn);
// }
//
// return deviceIDs;
// }
public List < String > getDeviceID ( String tableName ) {
public Map< String , Object > getDeviceID1 ( String tableName ) {
Connection conn = null ;
Connection conn = null ;
PreparedStatement pstmt = null ;
PreparedStatement pstmt = null ;
ResultSet rs = null ;
ResultSet rs = null ;
String sqlExecuting = null ;
String sqlExecuting = null ;
List < String > deviceIDs = new ArrayList < > ( ) ;
Map< String , Object > devIDTimeMap = new HashMap < > ( ) ;
try {
try {
conn = DataSourceUtils . getConnection ( ) ;
conn = DataSourceUtils . getConnection ( ) ;
String sql = "select distinct devid_val from sync_records where table_name =?" ;
String sql = "select d evid_val,field_val2 from sync_records where table_name =?";
pstmt = conn . prepareStatement ( sql ) ;
pstmt = conn . prepareStatement ( sql ) ;
pstmt . setString ( 1 , tableName ) ;
pstmt . setString ( 1 , tableName ) ;
rs = pstmt . executeQuery ( ) ;
rs = pstmt . executeQuery ( ) ;
while ( rs . next ( ) ) {
while ( rs . next ( ) ) {
deviceIDs . add ( rs . getString ( "devid_val" ) ) ;
dev IDTimeMap. put ( rs . getString ( "devid_val ") , rs . getString ( "field_val2 ") ) ;
}
}
} catch ( SQLException e ) {
} catch ( SQLException e ) {
logger . error ( "execute sql exception:" , e ) ;
logger . error ( "execute sql exception:" , e ) ;
@ -235,7 +259,7 @@ public class EiaServiceImpl implements EiaService {
DataSourceUtils . closeResource ( rs , pstmt , conn ) ;
DataSourceUtils . closeResource ( rs , pstmt , conn ) ;
}
}
return dev iceIDs ;
return dev IDTimeMap ;
}
}
public String getSQL ( String tableName ) {
public String getSQL ( String tableName ) {
@ -293,111 +317,177 @@ public class EiaServiceImpl implements EiaService {
}
}
// @Scheduled(initialDelay=1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务, 使用fixedRate属性每隔固定时间执行
// public void reportRecord(){
// List<String> allTableNames = getAllTableName();
// Map<String,Map<String,String>> tableFieldMap = new HashMap<>();
// Map<String, List<Map<String,List<Map<String,Object>>>>> allTableData = new HashMap<>();
// for(String tablenName : allTableNames){
// //if用来测试
// if(!"data_eaif_h".equals(tablenName)){
// Map<String,String> fieldMap = getFieldMap(tablenName);
// tableFieldMap.put(tablenName,fieldMap);
//
// String sqlExecuting = getSQL(tablenName);
// List<String> deviceIDs = getDeviceID(tablenName);
// List<Map<String,List<Map<String,Object>>>> dataOfoneTable = new ArrayList<>();
// for(String deviceID : deviceIDs){
// Map<String,List<Map<String,Object>>> deviceIDMap = new HashMap<>();
// String timeStamp = getLastRecordTimeSended(tablenName,deviceID);
// List<Map<String,Object>> dataOfoneDeviceID = getData(sqlExecuting,deviceID,timeStamp);
//
// deviceIDMap.put(deviceID,dataOfoneDeviceID);
// dataOfoneTable.add(deviceIDMap);
// }
// allTableData.put(tablenName,dataOfoneTable);
// }
//
// }
// System.out.println("旧数据: "+allTableData);
// System.out.println("===============================");
//
//
// Map<String,List<Map<String, List<Map<String, Object>>>>> newAllData = new HashMap<>();
// for(Map.Entry<String, List<Map<String,List<Map<String,Object>>>>> dataEntry : allTableData.entrySet()){
// List<Map<String, List<Map<String, Object>>>> newRecords = transformFields(dataEntry.getKey(),tableFieldMap,dataEntry.getValue());
// newAllData.put(dataEntry.getKey(),newRecords);
// }
//
// for(String tableName : newAllData.keySet()){
// List<Map<String, List<Map<String, Object>>>> records = newAllData.get(tableName);
// String jsonStringData = FormatUtil.list2Json(records);
// if(MqttUtil.publish2MQTT(jsonStringData)){
// updateLastRecordTimeSended(tableName,records);
// }else{
// System.out.println("消息推送失败");
// }
// }
//
// }
@Scheduled ( initialDelay = 1000 , fixedRate = 1000 * 3600 ) //通过@Scheduled声明该方法是计划任务, 使用fixedRate属性每隔固定时间执行
@Scheduled ( initialDelay = 1000 , fixedRate = 1000 * 3600 ) //通过@Scheduled声明该方法是计划任务, 使用fixedRate属性每隔固定时间执行
public void reportRecord ( ) {
public void reportRecord 1 ( ) {
List < String > allTableNames = getAllTableName ( ) ;
List < String > allTableNames = getAllTableName ( ) ;
Map < String , Map < String , String > > tableFieldMap = new HashMap < > ( ) ;
Map < String , Map < String , String > > tableFieldMap = new HashMap < > ( ) ;
Map < String , List < Map < String , List < Map < String , Object > > > > > allTableData = new HashMap < > ( ) ;
Map < String , List < Map < String , List < Map < String , Object > > > > > allTableData = new HashMap < > ( ) ;
for ( String tablenName : allTableNames ) {
for ( String table Name : allTableNames ) {
//if用来测试
//if用来测试
if ( ! "data_eaif_h" . equals ( tablenName ) ) {
if ( ! "data_eaif_h" . equals ( tableName ) ) {
Map < String , String > fieldMap = getFieldMap ( tablenName ) ;
Map < String , String > fieldMap = getFieldMap ( tableName ) ;
tableFieldMap . put ( tablenName , fieldMap ) ;
String sqlExecuting = getSQL ( tableName ) ;
Map < String , Object > devIDLastTimeMap = getDeviceID1 ( tableName ) ;
String sqlExecuting = getSQL ( tablenName ) ;
for ( String deviceID : devIDLastTimeMap . keySet ( ) ) {
List < String > deviceIDs = getDeviceID ( tablenName ) ;
List < Map < String , Object > > dataOfoneDeviceID = getData ( sqlExecuting , deviceID , ( String ) devIDLastTimeMap . get ( deviceID ) ) ;
List < Map < String , List < Map < String , Object > > > > dataOfoneTable = new ArrayList < > ( ) ;
for ( String deviceID : deviceIDs ) {
List < Map < String , Object > > newDataOfoneDeviceID = transformList ( fieldMap , dataOfoneDeviceID ) ;
Map < String , List < Map < String , Object > > > deviceIDMap = new HashMap < > ( ) ;
String timeStamp = getLastRecordTimeSended ( tablenName , deviceID ) ;
String jsonStringData = FormatUtil . list2Json ( newDataOfoneDeviceID ) ;
List < Map < String , Object > > dataOfoneDeviceID = getData ( sqlExecuting , deviceID , timeStamp ) ;
if ( MqttUtil . publish2MQTT ( jsonStringData ) ) {
deviceIDMap . put ( deviceID , dataOfoneDeviceID ) ;
updateSyncRecordsTable ( tableName , deviceID , ( String ) devIDLastTimeMap . get ( deviceID ) ) ;
dataOfoneTable . add ( deviceIDMap ) ;
} else {
System . out . println ( "消息推送失败" ) ;
}
}
}
allTableData . put ( tablenName , dataOfoneTable ) ;
}
}
}
}
System . out . println ( "旧数据: " + allTableData ) ;
// System.out.println("旧数据: "+allTableData);
System . out . println ( "===============================" ) ;
// System.out.println("===============================");
Map < String , List < Map < String , List < Map < String , Object > > > > > newAllData = new HashMap < > ( ) ;
// Map<String,List<Map<String, List<Map<String, Object>>>>> newAllData = new HashMap<>();
for ( Map . Entry < String , List < Map < String , List < Map < String , Object > > > > > dataEntry : allTableData . entrySet ( ) ) {
// for(Map.Entry<String, List<Map<String,List<Map<String,Object>>>>> dataEntry : allTableData.entrySet()){
List < Map < String , List < Map < String , Object > > > > newRecords = transformFields ( dataEntry . getKey ( ) , tableFieldMap , dataEntry . getValue ( ) ) ;
// List<Map<String, List<Map<String, Object>>>> newRecords = transformFields(dataEntry.getKey(),tableFieldMap,dataEntry.getValue());
newAllData . put ( dataEntry . getKey ( ) , newRecords ) ;
// newAllData.put(dataEntry.getKey(),newRecords);
}
// }
for ( String tableName : newAllData . keySet ( ) ) {
// for(String tableName : newAllData.keySet()){
List < Map < String , List < Map < String , Object > > > > records = newAllData . get ( tableName ) ;
// List<Map<String, List<Map<String, Object>>>> records = newAllData.get(tableName);
String jsonStringData = FormatUtil . list2Json ( records ) ;
// String jsonStringData = FormatUtil.list2Json(records);
if ( MqttUtil . publish2MQTT ( jsonStringData ) ) {
// if(MqttUtil.publish2MQTT(jsonStringData)){
updateLastRecordTimeSended ( tableName , records ) ;
// updateLastRecordTimeSended(tableName,records);
} else {
// }else{
System . out . println ( "消息推送失败" ) ;
// System.out.println("消息推送失败");
}
// }
}
// }
}
}
//返回替换字段名的records
// //返回替换字段名的records
private List < Map < String , List < Map < String , Object > > > > transformFields ( String recordTableName , Map < String , Map < String , String > > tableFieldMap , List < Map < String , List < Map < String , Object > > > > records ) {
// private List<Map<String,List<Map<String,Object>>>> transformFields(String recordTableName, Map<String,Map<String,String>> tableFieldMap, List<Map<String,List<Map<String,Object>>>> records) {
List < Map < String , List < Map < String , Object > > > > newRecords = new ArrayList < > ( ) ;
// List<Map<String,List<Map<String,Object>>>> newRecords = new ArrayList<>();
if ( tableFieldMap . containsKey ( recordTableName ) ) {
// if(tableFieldMap.containsKey(recordTableName)){
for ( Map < String , List < Map < String , Object > > > record : records ) {
// for(Map<String,List<Map<String,Object>>> record : records ){
newRecords . add ( transformMore ( tableFieldMap . get ( recordTableName ) , record ) ) ;
// newRecords.add(transformMore(tableFieldMap.get(recordTableName),record));
}
// }
}
// }
return newRecords ;
// return newRecords;
}
// }
//
private Map < String , List < Map < String , Object > > > transformMore ( Map < String , String > fieldMap , Map < String , List < Map < String , Object > > > deviceIDDataMap ) {
// private Map<String,List<Map<String,Object>>> transformMore(Map<String,String> fieldMap, Map<String,List<Map<String,Object>>> deviceIDDataMap) {
Map < String , List < Map < String , Object > > > newDeviceIDData = new HashMap < > ( ) ;
// Map<String,List<Map<String,Object>>> newDeviceIDData = new HashMap<>();
for ( Map . Entry < String , List < Map < String , Object > > > entry : deviceIDDataMap . entrySet ( ) ) {
// for(Map.Entry<String,List<Map<String,Object>>> entry : deviceIDDataMap.entrySet()){
newDeviceIDData . put ( entry . getKey ( ) , transformMoreAgain ( fieldMap , entry . getValue ( ) ) ) ;
// newDeviceIDData.put(entry.getKey(),transformMoreAgain(fieldMap, entry.getValue()));
}
// }
return newDeviceIDData ;
// return newDeviceIDData;
}
// }
//
// private List<Map<String,Object>> transformMoreAgain(Map<String,String> fieldMap, List<Map<String,Object>> deviceIDData) {
// List<Map<String,Object>> newDeviceIDData = new ArrayList<>();
// for(Map<String,Object> fieldValueMap : deviceIDData){
// newDeviceIDData.add(transformMoreAgain2(fieldMap,fieldValueMap));
// }
// return newDeviceIDData;
// }
//
// private Map<String,Object> transformMoreAgain2(Map<String,String> fieldMap, Map<String,Object> fieldValueMap) {
// Map<String,Object> newFieldValueMap = new HashMap<>();
// for(String field : fieldMap.keySet()){
// for(String columnName : fieldValueMap.keySet() ){
// if(Objects.equals(field,columnName)){
// newFieldValueMap.put(fieldMap.get(field),fieldValueMap.get(columnName) );
// }
// }
// }
// return newFieldValueMap;
// }
private List < Map < String , Object > > transformMoreAgain ( Map < String , String > fieldMap , List < Map < String , Object > > deviceIDData ) {
private List < Map < String , Object > > transform List ( Map < String , String > fieldMap , List < Map < String , Object > > deviceIDData ) {
List < Map < String , Object > > newDeviceIDData = new ArrayList < > ( ) ;
List < Map < String , Object > > newDeviceIDData = new ArrayList < > ( ) ;
for ( Map < String , Object > fieldValueMap : deviceIDData ) {
for ( Map < String , Object > fieldValueMap : deviceIDData ) {
newDeviceIDData . add ( transformMoreAgain2 ( fieldMap , fieldValueMap ) ) ;
newDeviceIDData . add ( transform OneRecord ( fieldMap , fieldValueMap ) ) ;
}
}
return newDeviceIDData ;
return newDeviceIDData ;
}
}
private Map < String , Object > transformMoreAgain2 ( Map < String , String > fieldMap , Map < String , Object > fieldValueMap ) {
private Map < String , Object > transform OneRecord ( Map < String , String > fieldMap , Map < String , Object > fieldValueMap ) {
Map < String , Object > newFieldValueMap = new HashMap < > ( ) ;
Map < String , Object > newFieldValueMap = new HashMap < > ( ) ;
for ( String field : fieldMap . keySet ( ) ) {
for ( String field : fieldMap . keySet ( ) ) {
for ( String columnName : fieldValueMap . keySet ( ) ) {
if ( fieldValueMap . containsKey ( field ) ) {
if ( Objects . equals ( field , columnName ) ) {
newFieldValueMap . put ( fieldMap . get ( field ) , fieldValueMap . get ( field ) ) ;
newFieldValueMap . put ( fieldMap . get ( field ) , fieldValueMap . get ( columnName ) ) ;
}
}
}
}
}
return newFieldValueMap ;
return newFieldValueMap ;
}
}
//
public void updateLastRecordTimeSended ( String tableName , List < Map < String , List < Map < String , Object > > > > records ) {
// public void updateLastRecordTimeSended(String tableName, List<Map<String,List<Map<String, Object>>>> records) {
List < String > deviceIDs = getDeviceID ( tableName ) ;
// List<String> deviceIDs = getDeviceID(tableName);
for ( String deviceID : deviceIDs ) {
// for(String deviceID : deviceIDs){
String lastRecordTimesJustSended = null ;
// String lastRecordTimesJustSended = null;
for ( Map < String , List < Map < String , Object > > > recordMap : records ) {
// for(Map<String,List<Map<String, Object>>> recordMap : records){
if ( recordMap . get ( deviceID ) ! = null ) {
// if(recordMap.get(deviceID) != null){
List < Map < String , Object > > deviceIDData = recordMap . get ( deviceID ) ;
// List<Map<String, Object>> deviceIDData = recordMap.get(deviceID);
lastRecordTimesJustSended = ( String ) deviceIDData . get ( deviceIDData . size ( ) - 1 ) . get ( "AcquisitionTime" ) ;
// lastRecordTimesJustSended = (String) deviceIDData.get(deviceIDData.size()-1).get("AcquisitionTime");
System . out . println ( tableName + "表" + deviceID + "最后一条记录时间: " + lastRecordTimesJustSended ) ;
// System.out.println(tableName+"表"+deviceID+"最后一条记录时间: "+lastRecordTimesJustSended);
}
// }
//
}
// }
// updateSyncRecordsTable(tableName, deviceID, lastRecordTimesJustSended);
// // updateSyncRecordsTable(tableName, deviceID, lastRecordTimesJustSended);
//
}
// }
//
}
// }
public boolean updateSyncRecordsTable ( String tableName , String deviceID , String time ) {
public boolean updateSyncRecordsTable ( String tableName , String deviceID , String time ) {