diff --git a/src/main/java/com/xydl/mapper/OperationDB.java b/src/main/java/com/xydl/mapper/OperationDB.java deleted file mode 100644 index 426578d..0000000 --- a/src/main/java/com/xydl/mapper/OperationDB.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.xydl.mapper; - - -import java.util.List; -import java.util.Map; - -public interface OperationDB { - - List getAllDevId(String tableName); - - String getEarliestTime4Other(String value, int devId); - - String getEarliestTime4Eaif(String value, int devId); - - List getDataEqmids(String value); - - List getSyncRecordDevIds(String tableName); - - List> getFieldMap(String tableName); - - List getAllTable(); - - String getSQL(String tableName); - - List> getDeviceIDAndtime(String tableName); - - void addEarliestTime(String clientId, String tableName, String devId, String earliestTime); - - List> getData(String sql); - - boolean updateSyncRecordsTable(String tableName, String deviceID, String time); - - -} diff --git a/src/main/java/com/xydl/schedule/ReportSchedule.java b/src/main/java/com/xydl/schedule/ReportSchedule.java index 05381c6..ccb4e0a 100644 --- a/src/main/java/com/xydl/schedule/ReportSchedule.java +++ b/src/main/java/com/xydl/schedule/ReportSchedule.java @@ -2,9 +2,7 @@ package com.xydl.schedule; import com.xydl.service.MqttService; -import com.xydl.service.impl.MqttServiceImpl; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -14,15 +12,11 @@ import javax.annotation.Resource; @Component @Slf4j public class ReportSchedule { - @Autowired - MqttServiceImpl mqttServiceimpl; @Resource MqttService mqttService; - @Scheduled(initialDelay = 1000, fixedDelay = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 + @Scheduled(initialDelay = 1000, fixedDelay = 1000 * 3600) public void reportRecord() { - log.info("运行定时转发任务"); -// mqttServiceimpl.reportRecord(); mqttService.uploadTask(); } diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java deleted file mode 100644 index f4597bc..0000000 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.xydl.service.impl; - - -import com.xydl.mapper.OperationDB; -import com.xydl.util.FormatUtil; -import com.xydl.util.MqttUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.*; - - -@Service -@Slf4j -public class MqttServiceImpl { - - @Autowired - OperationDB operationDBMapper; - - @Autowired - MqttUtil mqttUtil; - - - public void reportRecord() { - List allTableNames = operationDBMapper.getAllTable(); - for (String tableName : allTableNames) { - try { - processOneTable(tableName); - } catch (Exception e) { - log.error("processOneTable exception: " + tableName, e); - } - } - } - - //单个表数据发送 - public void processOneTable(String tableName) throws Exception { - Map fieldMap = new HashMap<>(); - List> fieldMaps = operationDBMapper.getFieldMap(tableName); - for (Map map : fieldMaps) { - for (String key : map.keySet()) { - fieldMap.put(map.get("field_name"), map.get("dest_field_name")); - } - } - String sqlExecuting = operationDBMapper.getSQL(tableName); - List dataEqmids = operationDBMapper.getDataEqmids(tableName); - Map devIDLastTimeMap = new HashMap<>(); - List> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName); - for (Map map : devIDLastTimeMaps) { - for (String devId : map.keySet()) { - devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); - } - } - log.info("表{},共{}个设备", tableName, dataEqmids.size()); - - if (dataEqmids.size() > 0) { - mqttUtil.connect(); - for (int deviceID : dataEqmids) { - Object time = devIDLastTimeMap.get(deviceID); - if (time == null) { - time = "2010-01-01 01:00:00"; - operationDBMapper.addEarliestTime("10", tableName, String.valueOf(deviceID), time.toString()); - } - publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap); - } - mqttUtil.disconnect(); - } - } - - //推送单个设备数据 - public void publishData(String deviceID, String time, String sqlExecuting, String tableName, Map fieldMap) { - - String devIdSQL = sqlExecuting.replace("%%DEVID%%", deviceID); - String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'"); - List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); - if (!dataOfoneDeviceID.isEmpty()) { - String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); - try { - String lastRecordTime = "2010-01-01 01:00:00"; - if (mqttUtil.publish2MQTT(jsonStringData)) { - if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) { - lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString(); - } else { - lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time").toString(); - } - operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime); - log.debug("表{}设备{}推送成功{}条数据,最后时间{}", tableName, deviceID, dataOfoneDeviceID.size(), lastRecordTime); - } - if (dataOfoneDeviceID.size() >= 1000) { - publishData(deviceID, lastRecordTime, sqlExecuting, tableName, fieldMap); - } - } catch (Exception e) { - log.error("表{}设备{}推送异常", tableName, deviceID, e); - } - } - - } - - -} - diff --git a/src/main/java/com/xydl/util/FormatUtil.java b/src/main/java/com/xydl/util/FormatUtil.java deleted file mode 100644 index 81837c9..0000000 --- a/src/main/java/com/xydl/util/FormatUtil.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.xydl.util; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.sql.Timestamp; -import java.util.*; -import java.util.stream.Collectors; - -public class FormatUtil { - - public static String mqttFormatTransform(List> list, Map fieldMap) { - ObjectMapper objectMapper = new ObjectMapper(); - - Map resultMap = new HashMap() {{ - put("AssetList", list.stream().map(e -> recordTransform(e, fieldMap)).collect(Collectors.toList())); - }}; - - String jsonString = null; - try { - jsonString = objectMapper.writeValueAsString(resultMap); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - return jsonString; - } - - public static Map recordTransform(Map record, Map fieldMap) { - Map mqttRecord = new TreeMap<>(); - List> attribuiteList = new ArrayList<>(); - for (String key : record.keySet()) { - if (fieldMap.containsKey(key)) { - attribuiteList.add(new HashMap() {{ - put("AttributeCode", fieldMap.get(key)); - put("DataValue", record.get(key)); - }}); - } - } - mqttRecord.put("AssetCode", record.get("sensorid")); - mqttRecord.put("AttributeList", attribuiteList); - String captureTime = record.get("d_time") != null ? record.get("d_time").toString() : (String) record.get("capturetime").toString(); - mqttRecord.put("Timestamp", Timestamp.valueOf(captureTime).getTime()); - - return mqttRecord; - } - - -} diff --git a/src/main/resources/com/xydl/mapper/OperationDB.xml b/src/main/resources/com/xydl/mapper/OperationDB.xml deleted file mode 100644 index 59237ef..0000000 --- a/src/main/resources/com/xydl/mapper/OperationDB.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - insert into sync_records (client_id,table_name,devid_val,field_val2) values (#{clientId},#{tableName},#{devId},#{earliestTime}) - - - - - - update sync_records set field_val2 = #{time} where table_name = #{tableName} and devid_val = #{deviceID} - - - - - -