You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
backend/src/main/java/com/xydl/cac/task/I2syncTask.java

123 lines
4.5 KiB
Java

package com.xydl.cac.task;
import com.xydl.cac.adapter.BusiAdapter;
import com.xydl.cac.entity.*;
import com.xydl.cac.repository.I2syncRecordRepository;
import com.xydl.cac.repository.NSensorRepository;
import com.xydl.cac.service.DataService;
import com.xydl.cac.service.I2syncService;
import com.xydl.cac.socket.WebSocketServer;
import com.xydl.cac.util.DateUtil;
import com.xydl.cac.util.DingTalkPushUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class I2syncTask {
@Resource
I2syncService i2syncService;
@Resource
I2syncRecordRepository recordRepository;
@Resource
NSensorRepository sensorRepository;
@Resource
DataService dataService;
@Resource
BusiAdapter busiAdapter;
@Resource
DingTalkPushUtil dingTalkPushUtil;
@Resource
WebSocketServer webSocketServer;
@Scheduled(initialDelay = 60 * 1000, fixedDelay = 30 * 60 * 1000)
public void syncAll() {
List<I2syncConfig> configList = i2syncService.listConfig();
if (busiAdapter.enable && !CollectionUtils.isEmpty(configList)) {
log.info("I2syncTask.syncAll 开始.");
try {
if (StringUtils.isBlank(busiAdapter.url)) {
log.info("i2同步地址busi-url未配置");
} else {
for (I2syncConfig config : configList) {
this.syncOneConfig(config);
}
}
log.info("I2syncTask.syncAll 结束.");
} catch (Exception e) {
log.error("I2syncTask.syncAll error.", e);
String str = "i2同步导出数据异常: " + e.getMessage();
webSocketServer.sendMessage(str , null);
dingTalkPushUtil.pushText(str);
}
}
}
// 同步一个类型
private void syncOneConfig(I2syncConfig config) throws Exception {
log.info("I2syncTask.syncOneConfig " + config.getTypeName());
List<I2syncField> fieldList = i2syncService.listFieldConfig(config.getTableName());
if (CollectionUtils.isEmpty(fieldList)) {
log.info("该类型未配置同步字段.");
return;
}
List<ModevTypePoint> points = new ArrayList<>();
for (I2syncField field : fieldList) {
ModevTypePoint point = new ModevTypePoint();
point.setField(field.getFieldName());
points.add(point);
}
List<NSensor> list = sensorRepository.findByTypeId(config.getModevtypeId());
for (NSensor sensor : list) {
if (StringUtils.isNotBlank(sensor.getEquipmentId()) &&
StringUtils.isNotBlank(sensor.getSensorCode()) &&
StringUtils.isNotBlank(sensor.getPhase())) {
sensor.setTableName(config.getTableName());
this.syncOneSensor(sensor, config, fieldList, points);
}
}
}
// 同步一个装置
private void syncOneSensor(NSensor sensor, I2syncConfig config,
List<I2syncField> fieldList, List<ModevTypePoint> points) throws Exception {
I2syncRecord record;
List<I2syncRecord> recordList = recordRepository.findByEqmid(sensor.getDevId());
if (CollectionUtils.isEmpty(recordList)) {
record = new I2syncRecord();
record.setEqmid(sensor.getDevId());
record.setModevtypeId(sensor.getTypeId());
} else {
record = recordList.get(0);
}
// 查询最新数据
List<Map<String, Object>> dataList = dataService.getLatestData(sensor, points, record.getLastDTime());
if (CollectionUtils.isEmpty(dataList)) {
return;
}
// 同步和导出
busiAdapter.uploadData(sensor, record, config, fieldList, dataList);
recordRepository.save(record);
int count = dataList.size();
log.info("I2syncTask.syncOneSensor 同步导出" + sensor.getName() + "(" + sensor.getSensorCode() + ") "
+ count + "条数据,最后数据时间" + DateUtil.format(record.getLastDTime()));
if (count >= 1000) {
dataList.clear();
this.syncOneSensor(sensor, config, fieldList, points);
}
}
}