main
wenhua.zhou 2 years ago
parent 0431b108f8
commit d6d1b45988

@ -26,31 +26,13 @@ public class MqttServiceImpl {
public void reportRecord() { public void reportRecord() {
List<String> allTableNames = operationDBMapper.getAllTable(); List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) { for (String tableName : allTableNames) {
send(tableName); processOneTable(tableName);
// String sqlExecuting = operationDBMapper.getSQL(tableName);
// List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
// Map<Integer, Object> devIDLastTimeMap = new HashMap<>();
// List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
// for (Map<String, Object> map : devIDLastTimeMaps) {
// for (String devId : map.keySet()) {
// devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
// }
// }
// for (int deviceID : dataEqmids) {
// String time = "";
// if (devIDLastTimeMap.get(deviceID) != null) {
// time = devIDLastTimeMap.get(deviceID).toString();
// } else {
// time = "2000-01-01 01:00:00";
// operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time);
// }
// publishData(deviceID, time, sqlExecuting, tableName, fieldMap);
// }
} }
} }
//单个表数据发送 //单个表数据发送
public void send(String tableName) { public void processOneTable(String tableName) {
log.info("tableName:{}", tableName);
Map<String, String> fieldMap = new HashMap<>(); Map<String, String> fieldMap = new HashMap<>();
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName); List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
for (Map<String, String> map : fieldMaps) { for (Map<String, String> map : fieldMaps) {
@ -58,10 +40,6 @@ public class MqttServiceImpl {
fieldMap.put(map.get("field_name"), map.get("dest_field_name")); fieldMap.put(map.get("field_name"), map.get("dest_field_name"));
} }
} }
send(tableName, fieldMap);
}
public void send(String tableName, Map<String, String> fieldMap) {
String sqlExecuting = operationDBMapper.getSQL(tableName); String sqlExecuting = operationDBMapper.getSQL(tableName);
List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName); List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
Map<Integer, Object> devIDLastTimeMap = new HashMap<>(); Map<Integer, Object> devIDLastTimeMap = new HashMap<>();
@ -72,14 +50,12 @@ public class MqttServiceImpl {
} }
} }
for (int deviceID : dataEqmids) { for (int deviceID : dataEqmids) {
String time = ""; Object time = devIDLastTimeMap.get(deviceID);
if (devIDLastTimeMap.get(deviceID) != null) { if (time == null) {
time = devIDLastTimeMap.get(deviceID).toString();
} else {
time = "2000-01-01 01:00:00"; time = "2000-01-01 01:00:00";
operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time); operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time.toString());
} }
publishData(deviceID, time, sqlExecuting, tableName, fieldMap); publishData(deviceID, time.toString(), sqlExecuting, tableName, fieldMap);
} }
} }
@ -92,12 +68,11 @@ public class MqttServiceImpl {
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
try { try {
if (mqttUtil.publish2MQTT(jsonStringData)) { if (mqttUtil.publish2MQTT(jsonStringData)) {
//insert
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, time); operationDBMapper.updateSyncRecordsTable(tableName, deviceID, time);
log.info("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData); log.debug("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData);
} }
} catch (Exception e) { } catch (Exception e) {
log.info("表{}设备{}推送异常:{}", tableName, deviceID, e.getMessage()); log.error("表{}设备{}推送异常:{}", tableName, deviceID, e.getMessage());
} }
} }

@ -62,14 +62,11 @@ public class MqttUtil {
token = client.getTopic(topic).publish(message); token = client.getTopic(topic).publish(message);
token.waitForCompletion(); token.waitForCompletion();
//打印发送状态 //打印发送状态
log.info("messagei is published completely! {}", token.isComplete()); // log.info("messagei is published completely! {}", token.isComplete());
// client.publish(topic, message); // client.publish(topic, message);
} catch (MqttException e) { } catch (MqttException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
log.info("Message published");
log.info("topic: {}", topic);
log.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " message content推送: " + content);
// disconnect // disconnect
try { try {
client.disconnect(); client.disconnect();

Loading…
Cancel
Save