feat: 第一次全量推送

main
huangfeng 2 years ago
parent 5799654e1f
commit abe4383222

@ -14,7 +14,7 @@ public class ReportSchedule {
@Autowired
MqttServiceImpl mqttServiceimpl;
@Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
@Scheduled(initialDelay = 1000, fixedDelay = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void reportRecord() {
log.info("运行定时转发任务");
mqttServiceimpl.reportRecord();

@ -70,14 +70,14 @@ public class MqttServiceImpl {
//推送单个设备数据
public void publishData(String deviceID, String time, String sqlExecuting, String tableName, Map<String, String> fieldMap) {
String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID));
String devIdSQL = sqlExecuting.replace("%%DEVID%%", deviceID);
String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'");
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(newSQL);
if (!dataOfoneDeviceID.isEmpty()) {
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
try {
String lastRecordTime = "2010-01-01 01:00:00";
if (mqttUtil.publish2MQTT(jsonStringData)) {
String lastRecordTime = "";
if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) {
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString();
} else {
@ -86,6 +86,9 @@ public class MqttServiceImpl {
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime);
log.debug("表{}设备{}推送成功{}条数据,最后时间{}", tableName, deviceID, dataOfoneDeviceID.size(), lastRecordTime);
}
if (dataOfoneDeviceID.size() >= 1000) {
publishData(deviceID, lastRecordTime, sqlExecuting, tableName, fieldMap);
}
} catch (Exception e) {
log.error("表{}设备{}推送异常", tableName, deviceID, e);
}

@ -66,17 +66,15 @@ public class MqttUtil {
}
public boolean publish2MQTT(String content) {
public boolean publish2MQTT(String content) throws Exception {
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// publish message
try {
token = client.getTopic(topic).publish(message);
token.waitForCompletion();
} catch (MqttException e) {
throw new RuntimeException(e);
}
token = client.getTopic(topic).publish(message);
token.waitForCompletion();
return token.isComplete();
}

@ -6,7 +6,7 @@ server:
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai
url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000
username: root
password: 123456

@ -6,7 +6,7 @@ server:
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai
url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000
username: root
password: root
@ -22,7 +22,7 @@ mqtt:
subscribe:
clientid: subscribe_client
broker: tcp://10.0.17.30:10883
topic: mqtt/test
topic: xydl
username: dianqi
password: 123456Qa%
qos: 0

Loading…
Cancel
Save