package com.xydl.service.impl; import com.xydl.mapper.OperationDB; import com.xydl.util.FormatUtil; import com.xydl.util.MqttUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; import java.util.stream.Collectors; @Service @Slf4j public class MqttServiceImpl { // private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); @Autowired OperationDB operationDBMapper; @Autowired MqttUtil mqttUtil; public void reportRecord() { List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { Map fieldMap = new HashMap<>(); List> fieldMaps = operationDBMapper.getFieldMap(tableName); for (Map map : fieldMaps) { for (String key : map.keySet()) { fieldMap.put(map.get("field_name"), map.get("dest_field_name")); } } String sqlExecuting = operationDBMapper.getSQL(tableName); List dataEqmids = operationDBMapper.getDataEqmids(tableName); // List syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName); // if (dataEqmids.size() != syncDevIds.size()) { // List distinctDevids = dataEqmids.stream().filter(e -> !syncDevIds.contains(e)).collect(Collectors.toList()); // for (Integer devId : distinctDevids) { // String earliestTime = null; // if ("data_eaif_h".equals(tableName)) { // earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId); // } else { // earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId); // } // operationDBMapper.addEarliestTime(10, tableName, String.valueOf(devId), earliestTime); // } // } Map devIDLastTimeMap = new HashMap<>(); List> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName); for (Map map : devIDLastTimeMaps) { for (String devId : map.keySet()) { devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); } } for (int deviceID : dataEqmids) { String time = ""; if(devIDLastTimeMap.get(deviceID) !=null){ time = devIDLastTimeMap.get(deviceID).toString(); }else{ time = "2000-01-01 01:00:00"; } publishData(deviceID, time, sqlExecuting, tableName, fieldMap); } } } //推送单个设备数据 public void publishData(int deviceID, String time, String sqlExecuting, String tableName, Map fieldMap) { String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID)); String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'"); List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); try { if (mqttUtil.publish2MQTT(jsonStringData)) { operationDBMapper.updateSyncRecordsTable(tableName, deviceID, time); log.info("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData); } } catch (Exception e) { log.info("表{}设备{}推送异常:{}", tableName, deviceID, e.getMessage()); } } }