From 99f7f51be24837520e496d93d2e54d656c1b6e84 Mon Sep 17 00:00:00 2001 From: "wenhua.zhou" Date: Thu, 7 Dec 2023 09:46:02 +0800 Subject: [PATCH] . --- .../xydl/service/impl/MqttServiceImpl.java | 39 +++---------- src/main/java/com/xydl/util/FormatUtil.java | 57 +++++++++++++++---- src/main/resources/sql/mqtt-schema.sql | 5 +- 3 files changed, 57 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index eb3fb5a..a7cddbd 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -1,6 +1,7 @@ package com.xydl.service.impl; +import com.fasterxml.jackson.core.JsonProcessingException; import com.xydl.mapper.OperationDB; import com.xydl.util.DataSourceUtils; import com.xydl.util.FormatUtil; @@ -14,6 +15,7 @@ import java.sql.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.Date; +import com.fasterxml.jackson.databind.ObjectMapper; @Service @@ -149,7 +151,7 @@ public class MqttServiceImpl { - public List> getData(String sqlExecuting, String deviceId, String timeStamp) { + public List> getData(String sqlExecuting, String deviceId, String time) { Connection conn = null; PreparedStatement pstmt = null; ResultSet rs = null; @@ -159,14 +161,14 @@ public class MqttServiceImpl { String sql = sqlExecuting; pstmt = conn.prepareStatement(sql); pstmt.setString(1, deviceId); - pstmt.setString(2,timeStamp); + pstmt.setString(2, time); rs = pstmt.executeQuery(); int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量 while(rs.next()){ Map record = new HashMap<>(); for (int col = 0; col < columnCount; col++) { String columnName = rs.getMetaData().getColumnName(col + 1); - String columnValue = rs.getString(columnName); + Object columnValue = rs.getString(columnName); record.put(columnName,columnValue); } records.add(record); @@ -249,27 +251,6 @@ public class MqttServiceImpl { } - - private List> transformList(Map fieldMap, List> deviceIDData) { - List> newDeviceIDData = new ArrayList<>(); - for(Map fieldValueMap : deviceIDData){ - newDeviceIDData.add(transformOneRecord(fieldMap,fieldValueMap)); - } - return newDeviceIDData; - } - - private Map transformOneRecord(Map fieldMap, Map fieldValueMap) { - Map newFieldValueMap = new HashMap<>(); - for(String field : fieldMap.keySet()){ - if(fieldValueMap.containsKey(field)){ - newFieldValueMap.put(fieldMap.get(field),fieldValueMap.get(field) ); - } - } - return newFieldValueMap; - } - - - public boolean updateSyncRecordsTable(String tableName, String deviceID, String time) { Connection conn = null; PreparedStatement pstmt = null; @@ -297,16 +278,14 @@ public class MqttServiceImpl { logger.info("开始执行"); List allTableNames = getAllTableNameFromSyncTable(); for(String tableName : allTableNames){ + Map fieldMap = getFieldMap(tableName); String sqlExecuting = getSQL(tableName); Map devIDLastTimeMap = getDeviceIDAndtime(tableName); for(String deviceID : devIDLastTimeMap.keySet()){ List> dataOfoneDeviceID = getData(sqlExecuting,deviceID, (String) devIDLastTimeMap.get(deviceID)); - - List> newDataOfoneDeviceID = transformList(fieldMap,dataOfoneDeviceID); - - String jsonStringData = FormatUtil.list2Json(newDataOfoneDeviceID); + String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID,fieldMap); logger.info("表{}设备{}推送数据:{}",tableName,deviceID,jsonStringData); if(MqttUtil.publish2MQTT(jsonStringData)){ updateSyncRecordsTable(tableName,deviceID, (String) devIDLastTimeMap.get(deviceID)); @@ -340,9 +319,7 @@ public class MqttServiceImpl { } } } - - - + } diff --git a/src/main/java/com/xydl/util/FormatUtil.java b/src/main/java/com/xydl/util/FormatUtil.java index efb79b7..3f9a82e 100644 --- a/src/main/java/com/xydl/util/FormatUtil.java +++ b/src/main/java/com/xydl/util/FormatUtil.java @@ -3,21 +3,19 @@ package com.xydl.util; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Timestamp; import java.util.*; +import java.util.stream.Collectors; public class FormatUtil { - public static String list2Json(List list){ + public static String mqttFormatTransform(List> list, Map fieldMap){ ObjectMapper objectMapper = new ObjectMapper(); - List> assetList = new ArrayList<>(); - assetList.add(new HashMap(){{ - put("AssetCode","ironCore"); - put("AttributeList",list); - put("Timestamp", new Date().getTime()); - }}); + Map resultMap = new HashMap(){{ - put("AssetList",assetList); + put("AssetList",list.stream().map(e -> recordTransform(e,fieldMap)).collect(Collectors.toList())); }}; + String jsonString = null; try { jsonString = objectMapper.writeValueAsString(resultMap); @@ -27,9 +25,48 @@ public class FormatUtil { return jsonString; } - public static void main(String[] args) { - System.out.println(new Date().getTime()); + public static Map recordTransform(Map record, Map fieldMap){ + Map mqttRecord = new TreeMap<>(); + List> attribuiteList = new ArrayList<>(); + for(String key : record.keySet()){ + if(fieldMap.containsKey(key)){ + attribuiteList.add(new HashMap(){{ + put("AttributeCode", fieldMap.get(key)); + put("DataValue",record.get(key)); + }}); + } + } + mqttRecord.put("AssetCode",record.get("sensorid")); + mqttRecord.put("AttributeList",attribuiteList); + String captureTime = record.get("d_time") !=null ? (String) record.get("d_time") : (String) record.get("capturetime"); + mqttRecord.put("Timestamp",Timestamp.valueOf(captureTime).getTime()); + + return mqttRecord; } +// private static List> transformFieldForList(Map fieldMap, List> oriFieldList) { +// List> newDeviceIDData = new ArrayList<>(); +// for(Map fieldValueMap : oriFieldList){ +// newDeviceIDData.add(transformOneRecord(fieldMap,fieldValueMap)); +// } +// return newDeviceIDData; +// } +// +// private static Map transformOneRecord(Map fieldMap, Map fieldValueMap) { +// Map newFieldValueMap = new HashMap<>(); +// for(String field : fieldMap.keySet()){ +// if(fieldValueMap.containsKey(field)){ +// newFieldValueMap.put(fieldMap.get(field),fieldValueMap.get(field) ); +// } +// } +// return newFieldValueMap; +// } + +// public static void main(String[] args) { +// System.out.println(Timestamp.valueOf("2022-05-01 12:11:00").getTime()); +// System.out.println(new Timestamp(new Date(Timestamp.valueOf("2022-05-01 12:11:00").getTime()).getTime())); +// } + + } diff --git a/src/main/resources/sql/mqtt-schema.sql b/src/main/resources/sql/mqtt-schema.sql index 34286b0..f8e44d2 100644 --- a/src/main/resources/sql/mqtt-schema.sql +++ b/src/main/resources/sql/mqtt-schema.sql @@ -20,7 +20,6 @@ SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for sync_tables_info -- ---------------------------- -DROP TABLE IF EXISTS `sync_tables_info`; CREATE TABLE `sync_tables_info` ( `client_id` int UNSIGNED NOT NULL, `table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -33,7 +32,7 @@ CREATE TABLE `sync_tables_info` ( PRIMARY KEY (`client_id`, `table_name`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -DROP TABLE IF EXISTS `sync_records`; + CREATE TABLE `sync_records` ( `client_id` int UNSIGNED NOT NULL, `table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -44,7 +43,7 @@ CREATE TABLE `sync_records` ( PRIMARY KEY (`client_id`, `table_name`, `devid_val`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -DROP TABLE IF EXISTS `sync_fields_info`; + CREATE TABLE `sync_fields_info` ( `id` int UNSIGNED NOT NULL AUTO_INCREMENT, `client_id` int UNSIGNED NOT NULL,