fix: 删除旧代码
parent
671670e906
commit
936fbd0335
@ -1,34 +0,0 @@
|
||||
package com.xydl.mapper;
|
||||
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface OperationDB {
|
||||
|
||||
List<String> getAllDevId(String tableName);
|
||||
|
||||
String getEarliestTime4Other(String value, int devId);
|
||||
|
||||
String getEarliestTime4Eaif(String value, int devId);
|
||||
|
||||
List<Integer> getDataEqmids(String value);
|
||||
|
||||
List<Integer> getSyncRecordDevIds(String tableName);
|
||||
|
||||
List<Map<String, String>> getFieldMap(String tableName);
|
||||
|
||||
List<String> getAllTable();
|
||||
|
||||
String getSQL(String tableName);
|
||||
|
||||
List<Map<String, Object>> getDeviceIDAndtime(String tableName);
|
||||
|
||||
void addEarliestTime(String clientId, String tableName, String devId, String earliestTime);
|
||||
|
||||
List<Map<String, Object>> getData(String sql);
|
||||
|
||||
boolean updateSyncRecordsTable(String tableName, String deviceID, String time);
|
||||
|
||||
|
||||
}
|
@ -1,101 +0,0 @@
|
||||
package com.xydl.service.impl;
|
||||
|
||||
|
||||
import com.xydl.mapper.OperationDB;
|
||||
import com.xydl.util.FormatUtil;
|
||||
import com.xydl.util.MqttUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MqttServiceImpl {
|
||||
|
||||
@Autowired
|
||||
OperationDB operationDBMapper;
|
||||
|
||||
@Autowired
|
||||
MqttUtil mqttUtil;
|
||||
|
||||
|
||||
public void reportRecord() {
|
||||
List<String> allTableNames = operationDBMapper.getAllTable();
|
||||
for (String tableName : allTableNames) {
|
||||
try {
|
||||
processOneTable(tableName);
|
||||
} catch (Exception e) {
|
||||
log.error("processOneTable exception: " + tableName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//单个表数据发送
|
||||
public void processOneTable(String tableName) throws Exception {
|
||||
Map<String, String> fieldMap = new HashMap<>();
|
||||
List<Map<String, String>> fieldMaps = operationDBMapper.getFieldMap(tableName);
|
||||
for (Map<String, String> map : fieldMaps) {
|
||||
for (String key : map.keySet()) {
|
||||
fieldMap.put(map.get("field_name"), map.get("dest_field_name"));
|
||||
}
|
||||
}
|
||||
String sqlExecuting = operationDBMapper.getSQL(tableName);
|
||||
List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
|
||||
Map<Integer, Object> devIDLastTimeMap = new HashMap<>();
|
||||
List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
|
||||
for (Map<String, Object> map : devIDLastTimeMaps) {
|
||||
for (String devId : map.keySet()) {
|
||||
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
|
||||
}
|
||||
}
|
||||
log.info("表{},共{}个设备", tableName, dataEqmids.size());
|
||||
|
||||
if (dataEqmids.size() > 0) {
|
||||
mqttUtil.connect();
|
||||
for (int deviceID : dataEqmids) {
|
||||
Object time = devIDLastTimeMap.get(deviceID);
|
||||
if (time == null) {
|
||||
time = "2010-01-01 01:00:00";
|
||||
operationDBMapper.addEarliestTime("10", tableName, String.valueOf(deviceID), time.toString());
|
||||
}
|
||||
publishData(String.valueOf(deviceID), time.toString(), sqlExecuting, tableName, fieldMap);
|
||||
}
|
||||
mqttUtil.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
//推送单个设备数据
|
||||
public void publishData(String deviceID, String time, String sqlExecuting, String tableName, Map<String, String> fieldMap) {
|
||||
|
||||
String devIdSQL = sqlExecuting.replace("%%DEVID%%", deviceID);
|
||||
String newSQL = devIdSQL.replace("%%KEYVALUE%%", "'" + time + "'");
|
||||
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(newSQL);
|
||||
if (!dataOfoneDeviceID.isEmpty()) {
|
||||
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
|
||||
try {
|
||||
String lastRecordTime = "2010-01-01 01:00:00";
|
||||
if (mqttUtil.publish2MQTT(jsonStringData)) {
|
||||
if (dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time") == null) {
|
||||
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("capturetime").toString();
|
||||
} else {
|
||||
lastRecordTime = dataOfoneDeviceID.get(dataOfoneDeviceID.size() - 1).get("d_time").toString();
|
||||
}
|
||||
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, lastRecordTime);
|
||||
log.debug("表{}设备{}推送成功{}条数据,最后时间{}", tableName, deviceID, dataOfoneDeviceID.size(), lastRecordTime);
|
||||
}
|
||||
if (dataOfoneDeviceID.size() >= 1000) {
|
||||
publishData(deviceID, lastRecordTime, sqlExecuting, tableName, fieldMap);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("表{}设备{}推送异常", tableName, deviceID, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,48 +0,0 @@
|
||||
package com.xydl.util;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class FormatUtil {
|
||||
|
||||
public static String mqttFormatTransform(List<Map<String, Object>> list, Map<String, String> fieldMap) {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
Map<String, Object> resultMap = new HashMap<String, Object>() {{
|
||||
put("AssetList", list.stream().map(e -> recordTransform(e, fieldMap)).collect(Collectors.toList()));
|
||||
}};
|
||||
|
||||
String jsonString = null;
|
||||
try {
|
||||
jsonString = objectMapper.writeValueAsString(resultMap);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return jsonString;
|
||||
}
|
||||
|
||||
public static Map<String, Object> recordTransform(Map<String, Object> record, Map<String, String> fieldMap) {
|
||||
Map<String, Object> mqttRecord = new TreeMap<>();
|
||||
List<Map<String, Object>> attribuiteList = new ArrayList<>();
|
||||
for (String key : record.keySet()) {
|
||||
if (fieldMap.containsKey(key)) {
|
||||
attribuiteList.add(new HashMap<String, Object>() {{
|
||||
put("AttributeCode", fieldMap.get(key));
|
||||
put("DataValue", record.get(key));
|
||||
}});
|
||||
}
|
||||
}
|
||||
mqttRecord.put("AssetCode", record.get("sensorid"));
|
||||
mqttRecord.put("AttributeList", attribuiteList);
|
||||
String captureTime = record.get("d_time") != null ? record.get("d_time").toString() : (String) record.get("capturetime").toString();
|
||||
mqttRecord.put("Timestamp", Timestamp.valueOf(captureTime).getTime());
|
||||
|
||||
return mqttRecord;
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
|
||||
<mapper namespace="com.xydl.mapper.OperationDB">
|
||||
|
||||
<select id="getAllDevId" parameterType="String" resultType="String">
|
||||
select distinct eqmid from ${tableName}
|
||||
</select>
|
||||
|
||||
<select id="getEarliestTime4Other" parameterType="String" resultType="String">
|
||||
select d_time from ${value} where eqmid=#{devId} ORDER BY d_time asc limit 1
|
||||
</select>
|
||||
|
||||
<select id="getEarliestTime4Eaif" parameterType="String" resultType="String">
|
||||
select capturetime from ${value} where eqmid=#{param2} ORDER BY capturetime asc limit 1
|
||||
</select>
|
||||
|
||||
<select id="getDataEqmids" parameterType="String" resultType="Integer">
|
||||
select distinct eqmid from ${value}
|
||||
</select>
|
||||
|
||||
<select id="getSyncRecordDevIds" parameterType="String" resultType="Integer">
|
||||
select devid_val from sync_records where table_name = #{tableName}
|
||||
</select>
|
||||
|
||||
<select id="getFieldMap" parameterType="String" resultType="Map">
|
||||
select field_name, dest_field_name from sync_fields_info where table_name=#{tableName}
|
||||
</select>
|
||||
|
||||
<select id="getAllTable" resultType="String">
|
||||
select table_name from sync_tables_info
|
||||
</select>
|
||||
|
||||
<select id="getSQL" parameterType="String" resultType="String">
|
||||
select `sql` from sync_tables_info where table_name = #{tableName}
|
||||
</select>
|
||||
|
||||
<select id="getDeviceIDAndtime" parameterType="String" resultType="Map">
|
||||
select devid_val,field_val2 from sync_records where table_name = #{tableName}
|
||||
</select>
|
||||
|
||||
<insert id="addEarliestTime" parameterType="String" >
|
||||
insert into sync_records (client_id,table_name,devid_val,field_val2) values (#{clientId},#{tableName},#{devId},#{earliestTime})
|
||||
</insert>
|
||||
|
||||
<select id="getData" parameterType="String" resultType="Map">
|
||||
${sql}
|
||||
</select>
|
||||
|
||||
<update id="updateSyncRecordsTable" parameterType="String" >
|
||||
update sync_records set field_val2 = #{time} where table_name = #{tableName} and devid_val = #{deviceID}
|
||||
</update>
|
||||
|
||||
|
||||
|
||||
|
||||
</mapper>
|
Loading…
Reference in New Issue