package com.xydl.service.impl; import com.xydl.mapper.OperationDB; import com.xydl.util.FormatUtil; import com.xydl.util.MqttUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; import java.util.stream.Collectors; @Service @Slf4j public class MqttServiceImpl { @Autowired OperationDB operationDBMapper; @Autowired MqttUtil mqttUtil; public void reportRecord() { List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { Map fieldMap = getTableFieldMap(tableName); String sqlExecuting = operationDBMapper.getSQL(tableName); List dataEqmids = operationDBMapper.getDataEqmids(tableName); Map devIDLastTimeMap = new HashMap<>(); List> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName); for (Map map : devIDLastTimeMaps) { for (String devId : map.keySet()) { devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2")); } } for (int deviceID : dataEqmids) { String time = ""; if(devIDLastTimeMap.get(deviceID) !=null){ time = devIDLastTimeMap.get(deviceID).toString(); }else{ time = "2000-01-01 01:00:00"; operationDBMapper.addEarliestTime(10, tableName, String.valueOf(deviceID), time); } publishData(deviceID, time, sqlExecuting, tableName, fieldMap); } } } //单个表的字段映射 public Map getTableFieldMap(String tableName){ Map fieldMap = new HashMap<>(); List> fieldMaps = operationDBMapper.getFieldMap(tableName); for (Map map : fieldMaps) { for (String key : map.keySet()) { fieldMap.put(map.get("field_name"), map.get("dest_field_name")); } } return fieldMap; } //推送单个设备数据 public void publishData(int deviceID, String time, String sqlExecuting, String tableName, Map fieldMap) { String devIdSQL = sqlExecuting.replace("%%DEVID%%", String.valueOf(deviceID)); String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'"); List> dataOfoneDeviceID = operationDBMapper.getData(newSQL); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); try { if (mqttUtil.publish2MQTT(jsonStringData)) { //insert operationDBMapper.updateSyncRecordsTable(tableName, deviceID, time); log.info("表{}设备{}推送成功:{}", tableName, deviceID, jsonStringData); } } catch (Exception e) { log.info("表{}设备{}推送异常:{}", tableName, deviceID, e.getMessage()); } } }