diff --git a/src/main/java/com/xydl/mapper/OperationDB.java b/src/main/java/com/xydl/mapper/OperationDB.java index 08a1f08..2e798cb 100644 --- a/src/main/java/com/xydl/mapper/OperationDB.java +++ b/src/main/java/com/xydl/mapper/OperationDB.java @@ -31,6 +31,4 @@ public interface OperationDB { boolean updateSyncRecordsTable(String tableName, int deviceID, String time); - - } diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index 13922be..ab67eba 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -26,7 +26,7 @@ public class MqttServiceImpl { public void reportRecord() { List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { - Map fieldMap = getTableFieldMap(tableName); + Map fieldMap = getTableFieldMap(tableName); String sqlExecuting = operationDBMapper.getSQL(tableName); List dataEqmids = operationDBMapper.getDataEqmids(tableName); Map devIDLastTimeMap = new HashMap<>(); @@ -38,19 +38,19 @@ public class MqttServiceImpl { } for (int deviceID : dataEqmids) { String time = ""; - if(devIDLastTimeMap.get(deviceID) !=null){ + if (devIDLastTimeMap.get(deviceID) != null) { time = devIDLastTimeMap.get(deviceID).toString(); - }else{ + } else { time = "2000-01-01 01:00:00"; operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time); } - publishData(deviceID, time, sqlExecuting, tableName, fieldMap); + publishData(deviceID, time, sqlExecuting, tableName, fieldMap); } } } //单个表的字段映射 - public Map getTableFieldMap(String tableName){ + public Map getTableFieldMap(String tableName) { Map fieldMap = new HashMap<>(); List> fieldMaps = operationDBMapper.getFieldMap(tableName); for (Map map : fieldMaps) { diff --git a/src/main/java/com/xydl/util/FormatUtil.java b/src/main/java/com/xydl/util/FormatUtil.java index 6711e1e..81837c9 100644 --- a/src/main/java/com/xydl/util/FormatUtil.java +++ b/src/main/java/com/xydl/util/FormatUtil.java @@ -9,11 +9,11 @@ import java.util.stream.Collectors; public class FormatUtil { - public static String mqttFormatTransform(List> list, Map fieldMap){ + public static String mqttFormatTransform(List> list, Map fieldMap) { ObjectMapper objectMapper = new ObjectMapper(); - Map resultMap = new HashMap(){{ - put("AssetList",list.stream().map(e -> recordTransform(e,fieldMap)).collect(Collectors.toList())); + Map resultMap = new HashMap() {{ + put("AssetList", list.stream().map(e -> recordTransform(e, fieldMap)).collect(Collectors.toList())); }}; String jsonString = null; @@ -25,25 +25,24 @@ public class FormatUtil { return jsonString; } - public static Map recordTransform(Map record, Map fieldMap){ - Map mqttRecord = new TreeMap<>(); - List> attribuiteList = new ArrayList<>(); - for(String key : record.keySet()){ - if(fieldMap.containsKey(key)){ - attribuiteList.add(new HashMap(){{ + public static Map recordTransform(Map record, Map fieldMap) { + Map mqttRecord = new TreeMap<>(); + List> attribuiteList = new ArrayList<>(); + for (String key : record.keySet()) { + if (fieldMap.containsKey(key)) { + attribuiteList.add(new HashMap() {{ put("AttributeCode", fieldMap.get(key)); - put("DataValue",record.get(key)); + put("DataValue", record.get(key)); }}); } } - mqttRecord.put("AssetCode",record.get("sensorid")); - mqttRecord.put("AttributeList",attribuiteList); - String captureTime = record.get("d_time") !=null ? record.get("d_time").toString() : (String) record.get("capturetime").toString(); - mqttRecord.put("Timestamp",Timestamp.valueOf(captureTime).getTime()); + mqttRecord.put("AssetCode", record.get("sensorid")); + mqttRecord.put("AttributeList", attribuiteList); + String captureTime = record.get("d_time") != null ? record.get("d_time").toString() : (String) record.get("capturetime").toString(); + mqttRecord.put("Timestamp", Timestamp.valueOf(captureTime).getTime()); - return mqttRecord; + return mqttRecord; } - } diff --git a/src/main/java/com/xydl/util/MqttUtil.java b/src/main/java/com/xydl/util/MqttUtil.java index de217ef..b6e9bf6 100644 --- a/src/main/java/com/xydl/util/MqttUtil.java +++ b/src/main/java/com/xydl/util/MqttUtil.java @@ -35,8 +35,7 @@ public class MqttUtil { private int qos; - - public boolean publish2MQTT(String content){ + public boolean publish2MQTT(String content) { MqttClient client; MqttDeliveryToken token; try { @@ -70,7 +69,7 @@ public class MqttUtil { } log.info("Message published"); log.info("topic: {}", topic); - log.info("{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content); + log.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " message content推送: " + content); // disconnect try { client.disconnect(); @@ -87,7 +86,7 @@ public class MqttUtil { return token.isComplete(); } - public void subScribeMQTT(){ + public void subScribeMQTT() { // String broker = "tcp://139.196.211.222:10883"; // String topic = "mqtt/test"; @@ -108,12 +107,14 @@ public class MqttUtil { public void connectionLost(Throwable cause) { log.info("connectionLost:{}", cause.getMessage()); } + public void messageArrived(String topic, MqttMessage message) { log.info("topic:{} ", topic); log.info("Qos:{} ", message.getQos()); - log.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload())); + log.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " message content接收: " + new String(message.getPayload())); } + public void deliveryComplete(IMqttDeliveryToken token) { log.info("deliveryComplete---------{}", token.isComplete()); } @@ -121,7 +122,8 @@ public class MqttUtil { client.connect(options); client.subscribe(topic, qos); } catch (Exception e) { - e.printStackTrace(); } + e.printStackTrace(); + } } diff --git a/src/main/java/com/xydl/util/Subscribe.java b/src/main/java/com/xydl/util/Subscribe.java index 696e259..58c899b 100644 --- a/src/main/java/com/xydl/util/Subscribe.java +++ b/src/main/java/com/xydl/util/Subscribe.java @@ -21,12 +21,12 @@ public class Subscribe { private static Subscribe single = new Subscribe(); @PostConstruct - private void SubscribeMqtt(){ + private void SubscribeMqtt() { // log.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // mqttUtil.subScribeMQTT(); } - public static Subscribe getInstance(){ + public static Subscribe getInstance() { return single; }