From abe43832223d4a5f9bfdd72f5edec07fdfae8cd1 Mon Sep 17 00:00:00 2001 From: huangfeng Date: Tue, 19 Dec 2023 09:39:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=AC=AC=E4=B8=80=E6=AC=A1=E5=85=A8?= =?UTF-8?q?=E9=87=8F=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/xydl/schedule/ReportSchedule.java | 2 +- .../java/com/xydl/service/impl/MqttServiceImpl.java | 7 +++++-- src/main/java/com/xydl/util/MqttUtil.java | 12 +++++------- src/main/resources/application-dev.yml | 2 +- src/main/resources/application-prod.yml | 4 ++-- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/xydl/schedule/ReportSchedule.java b/src/main/java/com/xydl/schedule/ReportSchedule.java index 8868e33..a240d13 100644 --- a/src/main/java/com/xydl/schedule/ReportSchedule.java +++ b/src/main/java/com/xydl/schedule/ReportSchedule.java @@ -14,7 +14,7 @@ public class ReportSchedule { @Autowired MqttServiceImpl mqttServiceimpl; - @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 + @Scheduled(initialDelay = 1000, fixedDelay = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 public void reportRecord() { log.info("运行定时转发任务"); mqttServiceimpl.reportRecord(); diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index 47095fd..f4597bc 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -70,14 +70,14 @@ public class MqttServiceImpl { //推送单个设备数据 public void publishData(String deviceID, String time, String sqlExecuting, String tableName, Map fieldMap) { - String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID)); + String devIdSQL = sqlExecuting.replace("%%DEVID%%", deviceID); String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'"); List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); if (!dataOfoneDeviceID.isEmpty()) { String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); try { + String lastRecordTime = "2010-01-01 01:00:00"; if (mqttUtil.publish2MQTT(jsonStringData)) { - String lastRecordTime = ""; if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) { lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString(); } else { @@ -86,6 +86,9 @@ public class MqttServiceImpl { operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime); log.debug("表{}设备{}推送成功{}条数据,最后时间{}", tableName, deviceID, dataOfoneDeviceID.size(), lastRecordTime); } + if (dataOfoneDeviceID.size() >= 1000) { + publishData(deviceID, lastRecordTime, sqlExecuting, tableName, fieldMap); + } } catch (Exception e) { log.error("表{}设备{}推送异常", tableName, deviceID, e); } diff --git a/src/main/java/com/xydl/util/MqttUtil.java b/src/main/java/com/xydl/util/MqttUtil.java index e831336..93daae6 100644 --- a/src/main/java/com/xydl/util/MqttUtil.java +++ b/src/main/java/com/xydl/util/MqttUtil.java @@ -66,17 +66,15 @@ public class MqttUtil { } - public boolean publish2MQTT(String content) { + public boolean publish2MQTT(String content) throws Exception { MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); // publish message - try { - token = client.getTopic(topic).publish(message); - token.waitForCompletion(); - } catch (MqttException e) { - throw new RuntimeException(e); - } + + token = client.getTopic(topic).publish(message); + token.waitForCompletion(); + return token.isComplete(); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 3be8685..60560fd 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -6,7 +6,7 @@ server: spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai + url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000 username: root password: 123456 diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 7810254..4aad4f2 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -6,7 +6,7 @@ server: spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai + url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000 username: root password: root @@ -22,7 +22,7 @@ mqtt: subscribe: clientid: subscribe_client broker: tcp://10.0.17.30:10883 - topic: mqtt/test + topic: xydl username: dianqi password: 123456Qa% qos: 0