代码重构

main
wenhua.zhou 2 years ago
parent f50b651ce2
commit cca355204b

@ -101,14 +101,15 @@
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <version>2.3.2</version>-->
<!-- <configuration>-->
<!-- <source>1.8</source>-->
<!-- <target>1.8</target>-->
<!-- </configuration>-->
<!-- </plugin>-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>

@ -1,10 +1,16 @@
package com.xydl;
import com.xydl.service.impl.MqttServiceImpl;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
@EnableScheduling
@MapperScan(basePackages = "com.xydl.mapper")
@SpringBootApplication//标识该类为主程序启动类

@ -1,23 +1,27 @@
package com.xydl.Schedule;
package com.xydl.schedule;
import com.xydl.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
@Component
@Slf4j
public class MyTimerTask {
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("运行定时任务: "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
new MqttServiceImpl().reportRecord();
// private static final Logger logger = LoggerFactory.getLogger(MyTimerTask.class);
@Autowired
MqttServiceImpl mqttServiceimpl;
@Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void init() {
log.info("运行定时任务:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
mqttServiceimpl.reportRecord();
}
}, 1000, 3000);
}
}

@ -1,17 +1,20 @@
package com.xydl.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@Slf4j
public class TestController {
// private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@RequestMapping("/eia")
@RequestMapping("/test")
@ResponseBody
public String test() {
System.out.println("===");
log.info("===");
return "测试成功";
}

@ -4,9 +4,7 @@ package com.xydl.service.impl;
import com.xydl.mapper.OperationDB;
import com.xydl.util.FormatUtil;
import com.xydl.util.MqttUtil;
import com.xydl.util.Subscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
@ -15,16 +13,18 @@ import java.util.stream.Collectors;
@Service
@Slf4j
public class MqttServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
@Autowired
OperationDB operationDBMapper;
// @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
@Autowired
MqttUtil mqttUtil;
public void reportRecord() {
logger.info("开始执行");
Subscribe.getInstance();
List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) {
@ -65,50 +65,24 @@ public class MqttServiceImpl {
for (int deviceID : devIDLastTimeMap.keySet()) {
String time = devIDLastTimeMap.get(deviceID).toString();
System.out.println(time);
String sql = preSQL+deviceID+midSQL+"'"+time+"'"+lastSQL;
System.out.println("sql :"+sql);
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(sql);
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
logger.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData);
if (new MqttUtil().publish2MQTT(jsonStringData)) {
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
logger.info("推送成功");
} else {
logger.info("消息推送失败");
log.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData);
try{
if (mqttUtil.publish2MQTT(jsonStringData)) {
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
log.info("推送成功");
}
}catch (Exception e){
log.info("推送异常:{}",e.getMessage());
}
}
}
}
// @Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次
// public void subScribeSamle() {
// logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
// MqttUtil.subScribeMQTT();
// }
// @Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
// public void checkDevIdTimer() {
// logger.info("每小时检测一次同步的表是否在‘同步记录表’");
// List<String> allTableNames = getAllTableNameFromSyncTable();
// for (String tableName : allTableNames) {
// if (!tableNameIfExitsSyncRec(tableName)) {
// logger.info("有不存在的表{},把所有的devId及最早的时间更新到'同步记录表'", tableName);
// List<String> devIds = operationDBMapper.getAllDevId(tableName);
// for (String devId : devIds) {
// String earliestTime = null;
// if ("data_eaif_h".equals(tableName)) {
// earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId);
// } else {
// earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId);
// }
// addEarliestTime2SyncRecord(tableName, devId, earliestTime);
// }
// }
// }
// }
}

@ -1,145 +0,0 @@
package com.xydl.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.*;
public class CommonUtils {
private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
private static final boolean languageZh = getPropertyValue("system.language", "zh").equals("zh");
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final long startupTime = System.currentTimeMillis();
private static final int startingTime = Integer.parseInt(getPropertyValue("system.startup.time", "60000"));
public static long getStartupTime() {
return startupTime;
}
public static boolean started() {
return System.currentTimeMillis() - startupTime > startingTime;
}
public static boolean isLanguageZh() {
return languageZh;
}
public static ObjectMapper getObjectMapper() {
return objectMapper;
}
public static String getPropertyValue(String propertyName, String defaultValue) {
Properties properties = loadPropertyFile("sdwan.common.cfg");
if (properties != null) {
String value = properties.getProperty(propertyName);
return value == null ? defaultValue : value;
}
return defaultValue;
}
public static Properties loadPropertyFile(String fileName) {
String filePath = System.getProperty("user.dir").concat("/etc/").concat(fileName);
File file = new File(filePath);
if (!file.exists()) {
return null;
}
try (InputStream is = new FileInputStream(file)) {
Properties properties = new Properties();
properties.load(is);
return properties;
} catch (IOException e) {
logger.error("load property file exception:", e);
}
return null;
}
public static boolean setProperty(String fileName, String key, String value) {
Properties properties = loadPropertyFile(fileName);
if (properties == null) {
properties = new Properties();
}
properties.setProperty(key, value);
return savePropertyFile(fileName, properties);
}
public static boolean savePropertyFile(String fileName, Properties properties) {
String filePath = System.getProperty("user.dir").concat("/etc/").concat(fileName);
File file = new File(filePath);
if (!file.exists()) {
return false;
}
try (OutputStream os = new FileOutputStream(file)) {
properties.store(os, null);
} catch (IOException e) {
logger.error("write file exception:", e);
return false;
}
return true;
}
@SuppressWarnings("unchecked")
public static <T> List<T> getListFromArray(Array array) {
if (array == null) {
return null;
}
try {
return Arrays.asList((T[]) array.getArray());
} catch (SQLException ignored) {
}
return null;
}
public static String formatDate(Date date) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
}
// public static void main(String args[]) {
// System.out.println(CommonUtils.getLanguage());
// }
}

@ -45,24 +45,6 @@ public class FormatUtil {
}
// private static List<Map<String,Object>> transformFieldForList(Map<String, String> fieldMap, List<Map<String, Object>> oriFieldList) {
// List<Map<String,Object>> newDeviceIDData = new ArrayList<>();
// for(Map<String,Object> fieldValueMap : oriFieldList){
// newDeviceIDData.add(transformOneRecord(fieldMap,fieldValueMap));
// }
// return newDeviceIDData;
// }
//
// private static Map<String,Object> transformOneRecord(Map<String, String> fieldMap, Map<String, Object> fieldValueMap) {
// Map<String,Object> newFieldValueMap = new HashMap<>();
// for(String field : fieldMap.keySet()){
// if(fieldValueMap.containsKey(field)){
// newFieldValueMap.put(fieldMap.get(field),fieldValueMap.get(field) );
// }
// }
// return newFieldValueMap;
// }
// public static void main(String[] args) {
// System.out.println(Timestamp.valueOf("2022-05-01 12:11:00").getTime());
// System.out.println(new Timestamp(new Date(Timestamp.valueOf("2022-05-01 12:11:00").getTime()).getTime()));

@ -1,6 +1,7 @@
package com.xydl.util;
import com.xydl.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
@ -12,21 +13,28 @@ import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@Slf4j
public class MqttUtil {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.topic}")
private String topic;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.publish.clientid}")
private String publishClientid;
@Value("${mqtt.subscribe.clientid}")
private String subscribeClientid;
@Value("${mqtt.qos}")
private int qos;
@ -39,9 +47,10 @@ public class MqttUtil {
// int qos = 0;
// String username = "test";
// String password = "AliOS%1688";
// String clientid = "publish_client";
String clientid = "publish_client";
MqttClient client;
MqttDeliveryToken token;
// System.out.println("broker: "+broker);
try {
client = new MqttClient(broker, publishClientid, new MemoryPersistence());
} catch (MqttException e) {
@ -66,14 +75,14 @@ public class MqttUtil {
token = client.getTopic(topic).publish(message);
token.waitForCompletion();
//打印发送状态
logger.info("messagei is published completely! {}", token.isComplete());
log.info("messagei is published completely! {}", token.isComplete());
// client.publish(topic, message);
} catch (MqttException e) {
throw new RuntimeException(e);
}
logger.info("Message published");
logger.info("topic: {}", topic);
logger.info("{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content);
log.info("Message published");
log.info("topic: {}", topic);
log.info("{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content);
// disconnect
try {
client.disconnect();
@ -109,16 +118,16 @@ public class MqttUtil {
// setup callback
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
logger.info("connectionLost:{}", cause.getMessage());
log.info("connectionLost:{}", cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
logger.info("topic:{} ", topic);
logger.info("Qos:{} ", message.getQos());
logger.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload()));
log.info("topic:{} ", topic);
log.info("Qos:{} ", message.getQos());
log.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("deliveryComplete---------{}", token.isComplete());
log.info("deliveryComplete---------{}", token.isComplete());
}
});
client.connect(options);

@ -1,20 +1,30 @@
package com.xydl.util;
import com.xydl.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@Slf4j
public class Subscribe {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
@Autowired
MqttUtil mqttUtil;
private static Subscribe single = new Subscribe();
private Subscribe(){
logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
new MqttUtil().subScribeMQTT();
@PostConstruct
private void SubscribeMqtt(){
log.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
// mqttUtil.subScribeMQTT();
}
public static Subscribe getInstance(){

@ -43,3 +43,20 @@ mqtt:
password: AliOS%1688
qos: 0
logging:
level:
root: info
com:
xydl:
controller:
TestControler: info
service:
impl: info
schedule: info
file:
name: E:\log\mqtt.log

Loading…
Cancel
Save