modify sql

main
wenhua.zhou 2 years ago
parent 43f55e4322
commit 2eefee7aff

@ -0,0 +1,102 @@
-- ----------------------------
-- Table structure for sync_tables_info
-- ----------------------------
DROP TABLE IF EXISTS `sync_tables_info`;
CREATE TABLE `sync_tables_info` (
`client_id` int UNSIGNED NOT NULL,
`table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`sql` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`devid_field_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`outer_devid_fname` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`field_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`field_type` tinyint NOT NULL DEFAULT 1 COMMENT '1: 整数 2: 字符串 3: 日期 4: 浮点数',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (`client_id`, `table_name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of sync_tables_info
-- ----------------------------
INSERT INTO `sync_tables_info` VALUES (10, 'data_eaif_h', 'SELECT t2.equipmentid , t2.sensorid,t1.capturetime,t1.maxtemp,t1.mintemp,t1.avgtemp,t2.`Phase` FROM `data_eaif_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=? AND t1.capturetime>? ORDER BY t1.capturetime LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-11 11:56:18');
INSERT INTO `sync_tables_info` VALUES (10, 'data_eia_h', 'SELECT t2.equipmentid, t2.sensorid,t1.d_time, t1.d_ct_1, t2.phase FROM `data_eia_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=? AND t1.d_time>? ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-06 14:17:19');
DROP TABLE IF EXISTS `sync_fields_info`;
CREATE TABLE `sync_fields_info` (
`id` int UNSIGNED NOT NULL AUTO_INCREMENT,
`client_id` int UNSIGNED NOT NULL,
`table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`field_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`field_type` tinyint UNSIGNED NOT NULL DEFAULT 0,
`dest_field_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
PRIMARY KEY (`id`) USING BTREE,
INDEX `fields`(`client_id`, `table_name`, `field_name`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 20 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of sync_fields_info
-- ----------------------------
INSERT INTO `sync_fields_info` VALUES (10, 10, 'data_eaif_h', 'maxtemp', 4, 'MaxTemp');
INSERT INTO `sync_fields_info` VALUES (11, 10, 'data_eaif_h', 'mintemp', 4, 'MinTemp');
INSERT INTO `sync_fields_info` VALUES (12, 10, 'data_eaif_h', 'avgtemp', 4, 'avgtemp');
INSERT INTO `sync_fields_info` VALUES (13, 10, 'data_eaif_h', 'phase', 2, 'Phase');
INSERT INTO `sync_fields_info` VALUES (15, 10, 'data_eia_h', 'phase', 2, 'Phase');
INSERT INTO `sync_fields_info` VALUES (19, 10, 'data_eia_h', 'd_ct_1', 4, 'TotalCoreCurrent');
-- ----------------------------
-- Table structure for sync_records
-- ----------------------------
DROP TABLE IF EXISTS `sync_records`;
CREATE TABLE `sync_records` (
`client_id` int UNSIGNED NOT NULL,
`table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`devid_val` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`field_val1` bigint UNSIGNED NULL DEFAULT 0,
`field_val2` datetime(0) NULL DEFAULT '0000-00-00 00:00:00',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (`client_id`, `table_name`, `devid_val`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of sync_records
-- ----------------------------
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '165', 0, '2019-06-29 10:33:47', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '188', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '189', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '191', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '197', 0, '2019-06-29 10:33:53', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '226', 0, '2019-06-29 10:33:47', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '227', 0, '2019-06-29 10:33:42', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '228', 0, '2019-06-29 10:33:37', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '229', 0, '2019-06-29 10:33:31', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '230', 0, '2019-06-29 10:33:26', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '231', 0, '2019-06-29 10:33:21', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '232', 0, '2019-06-29 10:33:16', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '233', 0, '2019-06-29 10:33:10', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '234', 0, '2019-06-29 10:33:05', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '235', 0, '2019-06-29 10:33:00', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '236', 0, '2019-06-29 11:32:55', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '382', 0, '2019-12-17 19:40:07', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '383', 0, '2019-12-17 19:40:05', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '384', 0, '2019-12-17 19:40:02', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '385', 0, '2019-12-17 19:41:07', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '386', 0, '2019-12-17 19:41:15', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '387', 0, '2019-12-17 19:40:14', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '388', 0, '2019-12-17 19:41:05', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '389', 0, '2019-12-17 19:40:58', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '390', 0, '2019-12-17 19:40:43', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '391', 0, '2019-12-17 19:40:51', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '392', 0, '2019-12-17 19:40:29', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '393', 0, '2019-12-17 19:40:36', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eaif_h', '394', 0, '2019-12-17 19:40:21', '2023-12-07 14:35:11');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '152', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '153', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '154', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '155', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '156', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '157', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '158', 0, '2019-12-16 11:00:00', '2023-12-07 14:54:55');
INSERT INTO `sync_records` VALUES (10, 'data_eia_h', '159', 0, '2019-12-17 15:30:00', '2023-12-07 14:54:55');

@ -1,26 +1,5 @@
/*
Navicat Premium Data Transfer
Source Server :
Source Server Type : MySQL
Source Server Version : 80100
Source Host : localhost:3306
Source Schema : cac
Target Server Type : MySQL
Target Server Version : 80100
File Encoding : 65001
Date: 05/12/2023 10:26:24
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for sync_tables_info
-- ----------------------------
CREATE TABLE `sync_tables_info` (
CREATE TABLE `sync_tables_info2` (
`client_id` int UNSIGNED NOT NULL,
`table_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`sql` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@ -30,7 +9,7 @@ CREATE TABLE `sync_tables_info` (
`field_type` tinyint NOT NULL DEFAULT 1 COMMENT '1: 整数 2: 字符串 3: 日期 4: 浮点数',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (`client_id`, `table_name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
);
CREATE TABLE `sync_records` (
@ -41,7 +20,7 @@ CREATE TABLE `sync_records` (
`field_val2` datetime(0) NULL DEFAULT '1999-01-01 01:00:00',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (`client_id`, `table_name`, `devid_val`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
);
CREATE TABLE `sync_fields_info` (
@ -53,6 +32,5 @@ CREATE TABLE `sync_fields_info` (
`dest_field_name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
PRIMARY KEY (`id`) USING BTREE,
INDEX `fields`(`client_id`, `table_name`, `field_name`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 41 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
);
SET FOREIGN_KEY_CHECKS = 1;

@ -7,11 +7,11 @@ import com.xydl.util.MqttUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
@Service
@Slf4j
public class MqttServiceImpl {
@ -27,19 +27,15 @@ public class MqttServiceImpl {
public void reportRecord() {
List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) {
Map<String,String> fieldMap = new HashMap<>();
Map<String, String> fieldMap = new HashMap<>();
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
for(Map<String,String> map: fieldMaps){
for(String key : map.keySet()){
fieldMap.put(map.get("field_name"),map.get("dest_field_name"));
for (Map<String, String> map : fieldMaps) {
for (String key : map.keySet()) {
fieldMap.put(map.get("field_name"), map.get("dest_field_name"));
}
}
String sqlExecuting = operationDBMapper.getSQL(tableName);
String preSQL = sqlExecuting.substring(0,sqlExecuting.indexOf("?"));
String midSQL = sqlExecuting.substring(sqlExecuting.indexOf("?")+1,sqlExecuting.lastIndexOf("?"));
String lastSQL = sqlExecuting.substring(sqlExecuting.lastIndexOf("?")+1);
List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
List<Integer> syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName);
if (dataEqmids.size() != syncDevIds.size()) {
@ -51,37 +47,40 @@ public class MqttServiceImpl {
} else {
earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId);
}
operationDBMapper.addEarliestTime(10,tableName, String.valueOf(devId), earliestTime);
operationDBMapper.addEarliestTime(10, tableName, String.valueOf(devId), earliestTime);
}
}
Map<Integer,Object> devIDLastTimeMap = new HashMap<>();
Map<Integer, Object> devIDLastTimeMap = new HashMap<>();
List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
for(Map<String,Object> map : devIDLastTimeMaps){
for(String devId : map.keySet()){
for (Map<String, Object> map : devIDLastTimeMaps) {
for (String devId : map.keySet()) {
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
}
}
for (int deviceID : devIDLastTimeMap.keySet()) {
String time = devIDLastTimeMap.get(deviceID).toString();
String sql = preSQL+deviceID+midSQL+"'"+time+"'"+lastSQL;
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(sql);
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
log.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData);
try{
if (mqttUtil.publish2MQTT(jsonStringData)) {
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
log.info("推送成功");
}
}catch (Exception e){
log.info("推送异常:{}",e.getMessage());
}
publishData(deviceID,sqlExecuting,devIDLastTimeMap,tableName,fieldMap);
}
}
}
public void publishData(int deviceID, String sqlExecuting, Map<Integer, Object> devIDLastTimeMap, String tableName,Map<String, String> fieldMap){
String time = devIDLastTimeMap.get(deviceID).toString();
String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID));
String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'"+time+"'");
System.out.println("SQL:"+newSQL);
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(newSQL);
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
try {
if (mqttUtil.publish2MQTT(jsonStringData)) {
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
log.info("表{}设备{}推送成功:{}",tableName,deviceID,jsonStringData);
}
} catch (Exception e) {
log.info("表{}设备{}推送异常:{}", tableName,deviceID,e.getMessage());
}
}
}

@ -6,25 +6,13 @@ server:
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
# url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=UTC
# username: root
# password: root
url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=UTC
url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=UTC
username: root
password: 123456
#sql:
#init:
#指定脚本文件位置
#schema-locations: classpath:user.sql
#初始化方式
#mode: always
#设置数据源类型C
type: com.alibaba.druid.pool.DruidDataSource
#初始化执行sql
initialization-mode: always
schema:
- classpath:/sql/mqtt-schema.sql
continue-on-error: true
password: root
# url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=UTC
# username: root
# password: 123456
mybatis:
configuration:
@ -57,7 +45,8 @@ logging:
impl: info
schedule: info
file:
name: E:\log\mqtt.log
# name: E:\log\mqtt.log
name: /root/log/mqtt.log

Loading…
Cancel
Save