From 2eefee7aff00171347fde4865cc280d508dfea6e Mon Sep 17 00:00:00 2001 From: "wenhua.zhou" Date: Mon, 11 Dec 2023 17:18:27 +0800 Subject: [PATCH] modify sql --- mqtt-schema.sql | 102 ++++++++++++++++++ .../resources/sql => sql}/mqtt-schema.sql | 30 +----- .../xydl/service/impl/MqttServiceImpl.java | 53 +++++---- src/main/resources/application.yml | 27 ++--- 4 files changed, 140 insertions(+), 72 deletions(-) create mode 100644 mqtt-schema.sql rename {src/main/resources/sql => sql}/mqtt-schema.sql (69%) diff --git a/mqtt-schema.sql b/mqtt-schema.sql new file mode 100644 index 0000000..7136844 --- /dev/null +++ b/mqtt-schema.sql @@ -0,0 +1,102 @@ + + +-- ---------------------------- +-- Table structure for sync_tables_info +-- ---------------------------- +DROP TABLE IF EXISTS `sync_tables_info`; +CREATE TABLE `sync_tables_info` ( + `client_id` int UNSIGNED NOT NULL, + `table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `sql` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `devid_field_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `outer_devid_fname` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `field_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `field_type` tinyint NOT NULL DEFAULT 1 COMMENT '1: 整数 2: 字符串 3: 日期 4: 浮点数', + `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), + PRIMARY KEY (`client_id`, `table_name`) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC; + +-- ---------------------------- +-- Records of sync_tables_info +-- ---------------------------- +INSERT INTO `sync_tables_info` VALUES (10, 'data_eaif_h', 'SELECT t2.equipmentid , t2.sensorid,t1.capturetime,t1.maxtemp,t1.mintemp,t1.avgtemp,t2.`Phase` FROM `data_eaif_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=? AND t1.capturetime>? ORDER BY t1.capturetime LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-11 11:56:18'); +INSERT INTO `sync_tables_info` VALUES (10, 'data_eia_h', 'SELECT t2.equipmentid, t2.sensorid,t1.d_time, t1.d_ct_1, t2.phase FROM `data_eia_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=? AND t1.d_time>? ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-06 14:17:19'); + +DROP TABLE IF EXISTS `sync_fields_info`; +CREATE TABLE `sync_fields_info` ( + `id` int UNSIGNED NOT NULL AUTO_INCREMENT, + `client_id` int UNSIGNED NOT NULL, + `table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `field_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `field_type` tinyint UNSIGNED NOT NULL DEFAULT 0, + `dest_field_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + PRIMARY KEY (`id`) USING BTREE, + INDEX `fields`(`client_id`, `table_name`, `field_name`) USING BTREE +) ENGINE = InnoDB AUTO_INCREMENT = 20 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC; + +-- ---------------------------- +-- Records of sync_fields_info +-- ---------------------------- +INSERT INTO `sync_fields_info` VALUES (10, 10, 'data_eaif_h', 'maxtemp', 4, 'MaxTemp'); +INSERT INTO `sync_fields_info` VALUES (11, 10, 'data_eaif_h', 'mintemp', 4, 'MinTemp'); +INSERT INTO `sync_fields_info` VALUES (12, 10, 'data_eaif_h', 'avgtemp', 4, 'avgtemp'); +INSERT INTO `sync_fields_info` VALUES (13, 10, 'data_eaif_h', 'phase', 2, 'Phase'); +INSERT INTO `sync_fields_info` VALUES (15, 10, 'data_eia_h', 'phase', 2, 'Phase'); +INSERT INTO `sync_fields_info` VALUES (19, 10, 'data_eia_h', 'd_ct_1', 4, 'TotalCoreCurrent'); + +-- ---------------------------- +-- Table structure for sync_records +-- ---------------------------- +DROP TABLE IF EXISTS `sync_records`; +CREATE TABLE `sync_records` ( + `client_id` int UNSIGNED NOT NULL, + `table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `devid_val` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `field_val1` bigint UNSIGNED NULL DEFAULT 0, + `field_val2` datetime(0) NULL DEFAULT '0000-00-00 00:00:00', + `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), + PRIMARY KEY (`client_id`, `table_name`, `devid_val`) USING BTREE +) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; + +-- ---------------------------- +-- Records of sync_records +-- ---------------------------- +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '165', 0, '2019-06-29 10:33:47', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '188', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '189', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '191', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '197', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '226', 0, '2019-06-29 10:33:47', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '227', 0, '2019-06-29 10:33:42', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '228', 0, '2019-06-29 10:33:37', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '229', 0, '2019-06-29 10:33:31', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '230', 0, '2019-06-29 10:33:26', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '231', 0, '2019-06-29 10:33:21', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '232', 0, '2019-06-29 10:33:16', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '233', 0, '2019-06-29 10:33:10', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '234', 0, '2019-06-29 10:33:05', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '235', 0, '2019-06-29 10:33:00', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '236', 0, '2019-06-29 11:32:55', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '382', 0, '2019-12-17 19:40:07', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '383', 0, '2019-12-17 19:40:05', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '384', 0, '2019-12-17 19:40:02', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '385', 0, '2019-12-17 19:41:07', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '386', 0, '2019-12-17 19:41:15', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '387', 0, '2019-12-17 19:40:14', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '388', 0, '2019-12-17 19:41:05', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '389', 0, '2019-12-17 19:40:58', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '390', 0, '2019-12-17 19:40:43', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '391', 0, '2019-12-17 19:40:51', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '392', 0, '2019-12-17 19:40:29', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '393', 0, '2019-12-17 19:40:36', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '394', 0, '2019-12-17 19:40:21', '2023-12-07 14:35:11'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '152', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '153', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '154', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '155', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '156', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '157', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '158', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55'); +INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '159', 0, '2019-12-17 15:30:00', '2023-12-07 14:54:55'); + + diff --git a/src/main/resources/sql/mqtt-schema.sql b/sql/mqtt-schema.sql similarity index 69% rename from src/main/resources/sql/mqtt-schema.sql rename to sql/mqtt-schema.sql index f8e44d2..51f51f3 100644 --- a/src/main/resources/sql/mqtt-schema.sql +++ b/sql/mqtt-schema.sql @@ -1,26 +1,5 @@ -/* - Navicat Premium Data Transfer - Source Server : 本机 - Source Server Type : MySQL - Source Server Version : 80100 - Source Host : localhost:3306 - Source Schema : cac - - Target Server Type : MySQL - Target Server Version : 80100 - File Encoding : 65001 - - Date: 05/12/2023 10:26:24 -*/ - -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; - --- ---------------------------- --- Table structure for sync_tables_info --- ---------------------------- -CREATE TABLE `sync_tables_info` ( +CREATE TABLE `sync_tables_info2` ( `client_id` int UNSIGNED NOT NULL, `table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `sql` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', @@ -30,7 +9,7 @@ CREATE TABLE `sync_tables_info` ( `field_type` tinyint NOT NULL DEFAULT 1 COMMENT '1: 整数 2: 字符串 3: 日期 4: 浮点数', `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), PRIMARY KEY (`client_id`, `table_name`) USING BTREE -) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; +); CREATE TABLE `sync_records` ( @@ -41,7 +20,7 @@ CREATE TABLE `sync_records` ( `field_val2` datetime(0) NULL DEFAULT '1999-01-01 01:00:00', `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), PRIMARY KEY (`client_id`, `table_name`, `devid_val`) USING BTREE -) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; +); CREATE TABLE `sync_fields_info` ( @@ -53,6 +32,5 @@ CREATE TABLE `sync_fields_info` ( `dest_field_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', PRIMARY KEY (`id`) USING BTREE, INDEX `fields`(`client_id`, `table_name`, `field_name`) USING BTREE -) ENGINE = InnoDB AUTO_INCREMENT = 41 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; +); -SET FOREIGN_KEY_CHECKS = 1; diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index 9165e1b..486000b 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -7,11 +7,11 @@ import com.xydl.util.MqttUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; + import java.util.*; import java.util.stream.Collectors; - @Service @Slf4j public class MqttServiceImpl { @@ -27,19 +27,15 @@ public class MqttServiceImpl { public void reportRecord() { List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { - - Map fieldMap = new HashMap<>(); + Map fieldMap = new HashMap<>(); List> fieldMaps = operationDBMapper.getFieldMap(tableName); - for(Map map: fieldMaps){ - for(String key : map.keySet()){ - fieldMap.put(map.get("field_name"),map.get("dest_field_name")); + for (Map map : fieldMaps) { + for (String key : map.keySet()) { + fieldMap.put(map.get("field_name"), map.get("dest_field_name")); } } String sqlExecuting = operationDBMapper.getSQL(tableName); - String preSQL = sqlExecuting.substring(0,sqlExecuting.indexOf("?")); - String midSQL = sqlExecuting.substring(sqlExecuting.indexOf("?")+1,sqlExecuting.lastIndexOf("?")); - String lastSQL = sqlExecuting.substring(sqlExecuting.lastIndexOf("?")+1); List dataEqmids = operationDBMapper.getDataEqmids(tableName); List syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName); if (dataEqmids.size() != syncDevIds.size()) { @@ -51,37 +47,40 @@ public class MqttServiceImpl { } else { earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId); } - operationDBMapper.addEarliestTime(10,tableName, String.valueOf(devId), earliestTime); + operationDBMapper.addEarliestTime(10, tableName, String.valueOf(devId), earliestTime); } } - Map devIDLastTimeMap = new HashMap<>(); + Map devIDLastTimeMap = new HashMap<>(); List> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName); - for(Map map : devIDLastTimeMaps){ - for(String devId : map.keySet()){ + for (Map map : devIDLastTimeMaps) { + for (String devId : map.keySet()) { devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); } } for (int deviceID : devIDLastTimeMap.keySet()) { - String time = devIDLastTimeMap.get(deviceID).toString(); - String sql = preSQL+deviceID+midSQL+"'"+time+"'"+lastSQL; - List> dataOfoneDeviceID = operationDBMapper.getData(sql); - String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); - log.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData); - try{ - if (mqttUtil.publish2MQTT(jsonStringData)) { - operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID)); - log.info("推送成功"); - } - }catch (Exception e){ - log.info("推送异常:{}",e.getMessage()); - } - + publishData(deviceID,sqlExecuting,devIDLastTimeMap,tableName,fieldMap); } } } + public void publishData(int deviceID, String sqlExecuting, Map devIDLastTimeMap, String tableName,Map fieldMap){ + String time = devIDLastTimeMap.get(deviceID).toString(); + String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID)); + String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'"+time+"'"); + System.out.println("SQL:"+newSQL); + List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); + String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); + try { + if (mqttUtil.publish2MQTT(jsonStringData)) { + operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID)); + log.info("表{}设备{}推送成功:{}",tableName,deviceID,jsonStringData); + } + } catch (Exception e) { + log.info("表{}设备{}推送异常:{}", tableName,deviceID,e.getMessage()); + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4d8ede4..9d4df14 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,25 +6,13 @@ server: spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver -# url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=UTC -# username: root -# password: root - url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=UTC + url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=UTC username: root - password: 123456 - #sql: - #init: - #指定脚本文件位置 - #schema-locations: classpath:user.sql - #初始化方式 - #mode: always - #设置数据源类型C - type: com.alibaba.druid.pool.DruidDataSource - #初始化执行sql - initialization-mode: always - schema: - - classpath:/sql/mqtt-schema.sql - continue-on-error: true + password: root +# url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=UTC +# username: root +# password: 123456 + mybatis: configuration: @@ -57,7 +45,8 @@ logging: impl: info schedule: info file: - name: E:\log\mqtt.log +# name: E:\log\mqtt.log + name: /root/log/mqtt.log