From 84a7445f251d666f34ebdde7407e133ede52ce37 Mon Sep 17 00:00:00 2001 From: "wenhua.zhou" Date: Tue, 12 Dec 2023 15:35:50 +0800 Subject: [PATCH] . --- .../xydl/service/impl/MqttServiceImpl.java | 16 +++- src/main/java/com/xydl/util/MqttUtil.java | 79 +++++++++++++++---- 2 files changed, 75 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index cb17ac9..f6a0f18 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -26,12 +26,16 @@ public class MqttServiceImpl { public void reportRecord() { List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { - processOneTable(tableName); + try { + processOneTable(tableName); + } catch (Exception e) { + throw new RuntimeException(e); + } } } //单个表数据发送 - public void processOneTable(String tableName) { + public void processOneTable(String tableName) throws Exception{ log.info("tableName:{}", tableName); Map fieldMap = new HashMap<>(); List> fieldMaps = operationDBMapper.getFieldMap(tableName); @@ -49,6 +53,7 @@ public class MqttServiceImpl { devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); } } + mqttUtil.connect(); for (int deviceID : dataEqmids) { Object time = devIDLastTimeMap.get(deviceID); if (time == null) { @@ -57,6 +62,7 @@ public class MqttServiceImpl { } publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap); } + mqttUtil.disconnect(); } //推送单个设备数据 @@ -68,9 +74,11 @@ public class MqttServiceImpl { String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); try { if (mqttUtil.publish2MQTT(jsonStringData)) { - String lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time").toString(); - if(tableName.equals("data_eaif_h")){ + String lastRecordTime = ""; + if(dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time") == null){ lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("capturetime").toString(); + }else{ + lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time").toString(); } operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime); log.debug("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData); diff --git a/src/main/java/com/xydl/util/MqttUtil.java b/src/main/java/com/xydl/util/MqttUtil.java index cf764cb..25c6c6f 100644 --- a/src/main/java/com/xydl/util/MqttUtil.java +++ b/src/main/java/com/xydl/util/MqttUtil.java @@ -34,10 +34,11 @@ public class MqttUtil { @Value("${mqtt.qos}") private int qos; + private MqttClient client; - public boolean publish2MQTT(String content) { - MqttClient client; - MqttDeliveryToken token; + private MqttDeliveryToken token; + + public void connect(){ try { client = new MqttClient(broker, publishClientid, new MemoryPersistence()); } catch (MqttException e) { @@ -48,37 +49,83 @@ public class MqttUtil { options.setPassword(password.toCharArray()); options.setConnectionTimeout(60); options.setKeepAliveInterval(30); - // connect try { client.connect(options); } catch (MqttException e) { throw new RuntimeException(e); } - // create message and setup QoS + } + + public void disconnect(){ + try { + client.disconnect(); + } catch (MqttException e) { + throw new RuntimeException(e); + } + // close client + try { + client.close(); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + public boolean publish(String content){ MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); // publish message try { token = client.getTopic(topic).publish(message); token.waitForCompletion(); - //打印发送状态 -// log.info("messagei is published completely! {}", token.isComplete()); -// client.publish(topic, message); } catch (MqttException e) { throw new RuntimeException(e); } - // disconnect - try { - client.disconnect(); - } catch (MqttException e) { - throw new RuntimeException(e); - } - // close client + return token.isComplete(); + } + + + + public boolean publish2MQTT(String content) { +// MqttClient client; +// MqttDeliveryToken token; +// try { +// client = new MqttClient(broker, publishClientid, new MemoryPersistence()); +// } catch (MqttException e) { +// throw new RuntimeException(e); +// } +// MqttConnectOptions options = new MqttConnectOptions(); +// options.setUserName(username); +// options.setPassword(password.toCharArray()); +// options.setConnectionTimeout(60); +// options.setKeepAliveInterval(30); +// // connect +// try { +// client.connect(options); +// } catch (MqttException e) { +// throw new RuntimeException(e); +// } + // create message and setup QoS + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + // publish message try { - client.close(); + token = client.getTopic(topic).publish(message); + token.waitForCompletion(); } catch (MqttException e) { throw new RuntimeException(e); } + // disconnect +// try { +// client.disconnect(); +// } catch (MqttException e) { +// throw new RuntimeException(e); +// } +// // close client +// try { +// client.close(); +// } catch (MqttException e) { +// throw new RuntimeException(e); +// } return token.isComplete(); }