diff --git a/src/main/java/com/xydl/schedule/ReportSchedule.java b/src/main/java/com/xydl/schedule/ReportSchedule.java index 8868e33..a240d13 100644 --- a/src/main/java/com/xydl/schedule/ReportSchedule.java +++ b/src/main/java/com/xydl/schedule/ReportSchedule.java @@ -14,7 +14,7 @@ public class ReportSchedule { @Autowired MqttServiceImpl mqttServiceimpl; - @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 + @Scheduled(initialDelay = 1000, fixedDelay = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 public void reportRecord() { log.info("运行定时转发任务"); mqttServiceimpl.reportRecord(); diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index 47095fd..f4597bc 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -70,14 +70,14 @@ public class MqttServiceImpl { //推送单个设备数据 public void publishData(String deviceID, String time, String sqlExecuting, String tableName, Map fieldMap) { - String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID)); + String devIdSQL = sqlExecuting.replace("%%DEVID%%", deviceID); String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'"); List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); if (!dataOfoneDeviceID.isEmpty()) { String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); try { + String lastRecordTime = "2010-01-01 01:00:00"; if (mqttUtil.publish2MQTT(jsonStringData)) { - String lastRecordTime = ""; if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) { lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString(); } else { @@ -86,6 +86,9 @@ public class MqttServiceImpl { operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime); log.debug("表{}设备{}推送成功{}条数据,最后时间{}", tableName, deviceID, dataOfoneDeviceID.size(), lastRecordTime); } + if (dataOfoneDeviceID.size() >= 1000) { + publishData(deviceID, lastRecordTime, sqlExecuting, tableName, fieldMap); + } } catch (Exception e) { log.error("表{}设备{}推送异常", tableName, deviceID, e); } diff --git a/src/main/java/com/xydl/util/MqttUtil.java b/src/main/java/com/xydl/util/MqttUtil.java index e831336..93daae6 100644 --- a/src/main/java/com/xydl/util/MqttUtil.java +++ b/src/main/java/com/xydl/util/MqttUtil.java @@ -66,17 +66,15 @@ public class MqttUtil { } - public boolean publish2MQTT(String content) { + public boolean publish2MQTT(String content) throws Exception { MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); // publish message - try { - token = client.getTopic(topic).publish(message); - token.waitForCompletion(); - } catch (MqttException e) { - throw new RuntimeException(e); - } + + token = client.getTopic(topic).publish(message); + token.waitForCompletion(); + return token.isComplete(); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 3be8685..60560fd 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -6,7 +6,7 @@ server: spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai + url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000 username: root password: 123456 diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 7810254..4aad4f2 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -6,7 +6,7 @@ server: spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai + url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000 username: root password: root @@ -22,7 +22,7 @@ mqtt: subscribe: clientid: subscribe_client broker: tcp://10.0.17.30:10883 - topic: mqtt/test + topic: xydl username: dianqi password: 123456Qa% qos: 0