wenhua.zhou 2 years ago
parent af5941e048
commit 84a7445f25

@ -26,12 +26,16 @@ public class MqttServiceImpl {
public void reportRecord() { public void reportRecord() {
List<String> allTableNames = operationDBMapper.getAllTable(); List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) { for (String tableName : allTableNames) {
try {
processOneTable(tableName); processOneTable(tableName);
} catch (Exception e) {
throw new RuntimeException(e);
}
} }
} }
//单个表数据发送 //单个表数据发送
public void processOneTable(String tableName) { public void processOneTable(String tableName) throws Exception{
log.info("tableName:{}", tableName); log.info("tableName:{}", tableName);
Map<String, String> fieldMap = new HashMap<>(); Map<String, String> fieldMap = new HashMap<>();
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName); List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
@ -49,6 +53,7 @@ public class MqttServiceImpl {
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
} }
} }
mqttUtil.connect();
for (int deviceID : dataEqmids) { for (int deviceID : dataEqmids) {
Object time = devIDLastTimeMap.get(deviceID); Object time = devIDLastTimeMap.get(deviceID);
if (time == null) { if (time == null) {
@ -57,6 +62,7 @@ public class MqttServiceImpl {
} }
publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap); publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap);
} }
mqttUtil.disconnect();
} }
//推送单个设备数据 //推送单个设备数据
@ -68,9 +74,11 @@ public class MqttServiceImpl {
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
try { try {
if (mqttUtil.publish2MQTT(jsonStringData)) { if (mqttUtil.publish2MQTT(jsonStringData)) {
String lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time").toString(); String lastRecordTime = "";
if(tableName.equals("data_eaif_h")){ if(dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time") == null){
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("capturetime").toString(); lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("capturetime").toString();
}else{
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size()-1).get("d_time").toString();
} }
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime); operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime);
log.debug("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData); log.debug("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData);

@ -34,10 +34,11 @@ public class MqttUtil {
@Value("${mqtt.qos}") @Value("${mqtt.qos}")
private int qos; private int qos;
private MqttClient client;
public boolean publish2MQTT(String content) { private MqttDeliveryToken token;
MqttClient client;
MqttDeliveryToken token; public void connect(){
try { try {
client = new MqttClient(broker, publishClientid, new MemoryPersistence()); client = new MqttClient(broker, publishClientid, new MemoryPersistence());
} catch (MqttException e) { } catch (MqttException e) {
@ -48,37 +49,83 @@ public class MqttUtil {
options.setPassword(password.toCharArray()); options.setPassword(password.toCharArray());
options.setConnectionTimeout(60); options.setConnectionTimeout(60);
options.setKeepAliveInterval(30); options.setKeepAliveInterval(30);
// connect
try { try {
client.connect(options); client.connect(options);
} catch (MqttException e) { } catch (MqttException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// create message and setup QoS }
public void disconnect(){
try {
client.disconnect();
} catch (MqttException e) {
throw new RuntimeException(e);
}
// close client
try {
client.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
public boolean publish(String content){
MqttMessage message = new MqttMessage(content.getBytes()); MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos); message.setQos(qos);
// publish message // publish message
try { try {
token = client.getTopic(topic).publish(message); token = client.getTopic(topic).publish(message);
token.waitForCompletion(); token.waitForCompletion();
//打印发送状态
// log.info("messagei is published completely! {}", token.isComplete());
// client.publish(topic, message);
} catch (MqttException e) { } catch (MqttException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// disconnect return token.isComplete();
try {
client.disconnect();
} catch (MqttException e) {
throw new RuntimeException(e);
} }
// close client
public boolean publish2MQTT(String content) {
// MqttClient client;
// MqttDeliveryToken token;
// try {
// client = new MqttClient(broker, publishClientid, new MemoryPersistence());
// } catch (MqttException e) {
// throw new RuntimeException(e);
// }
// MqttConnectOptions options = new MqttConnectOptions();
// options.setUserName(username);
// options.setPassword(password.toCharArray());
// options.setConnectionTimeout(60);
// options.setKeepAliveInterval(30);
// // connect
// try {
// client.connect(options);
// } catch (MqttException e) {
// throw new RuntimeException(e);
// }
// create message and setup QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// publish message
try { try {
client.close(); token = client.getTopic(topic).publish(message);
token.waitForCompletion();
} catch (MqttException e) { } catch (MqttException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// disconnect
// try {
// client.disconnect();
// } catch (MqttException e) {
// throw new RuntimeException(e);
// }
// // close client
// try {
// client.close();
// } catch (MqttException e) {
// throw new RuntimeException(e);
// }
return token.isComplete(); return token.isComplete();
} }

Loading…
Cancel
Save