package com.xydl.service.impl; import com.xydl.mapper.OperationDB; import com.xydl.util.FormatUtil; import com.xydl.util.MqttUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; @Service @Slf4j public class MqttServiceImpl { @Autowired OperationDB operationDBMapper; @Autowired MqttUtil mqttUtil; public void reportRecord() { List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { try { processOneTable(tableName); } catch (Exception e) { log.error("processOneTable exception: " + tableName, e); } } } //单个表数据发送 public void processOneTable(String tableName) throws Exception { Map fieldMap = new HashMap<>(); List> fieldMaps = operationDBMapper.getFieldMap(tableName); for (Map map : fieldMaps) { for (String key : map.keySet()) { fieldMap.put(map.get("field_name"), map.get("dest_field_name")); } } String sqlExecuting = operationDBMapper.getSQL(tableName); List dataEqmids = operationDBMapper.getDataEqmids(tableName); Map devIDLastTimeMap = new HashMap<>(); List> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName); for (Map map : devIDLastTimeMaps) { for (String devId : map.keySet()) { devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); } } log.info("表{},共{}个设备", tableName, dataEqmids.size()); if (dataEqmids.size() > 0) { mqttUtil.connect(); for (int deviceID : dataEqmids) { Object time = devIDLastTimeMap.get(deviceID); if (time == null) { time = "2010-01-01 01:00:00"; operationDBMapper.addEarliestTime("10", tableName, String.valueOf(deviceID), time.toString()); } publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap); } mqttUtil.disconnect(); } } //推送单个设备数据 public void publishData(String deviceID, String time, String sqlExecuting, String tableName, Map fieldMap) { String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID)); String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'"); List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); if (!dataOfoneDeviceID.isEmpty()) { String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); try { if (mqttUtil.publish2MQTT(jsonStringData)) { String lastRecordTime = ""; if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) { lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString(); } else { lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time").toString(); } operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime); log.debug("表{}设备{}推送成功{}条数据,最后时间{}", tableName, deviceID, dataOfoneDeviceID.size(), lastRecordTime); } } catch (Exception e) { log.error("表{}设备{}推送异常", tableName, deviceID, e); } } } }