diff --git a/pom.xml b/pom.xml index fe9191f..e841483 100644 --- a/pom.xml +++ b/pom.xml @@ -101,14 +101,15 @@ - - maven-compiler-plugin - 2.3.2 - - 1.8 - 1.8 - - + + + + + + + + + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/com/xydl/MqttApplication.java b/src/main/java/com/xydl/MqttApplication.java index 5831936..ac70c6e 100644 --- a/src/main/java/com/xydl/MqttApplication.java +++ b/src/main/java/com/xydl/MqttApplication.java @@ -1,10 +1,16 @@ package com.xydl; +import com.xydl.service.impl.MqttServiceImpl; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + @EnableScheduling @MapperScan(basePackages = "com.xydl.mapper") @SpringBootApplication//标识该类为主程序启动类 diff --git a/src/main/java/com/xydl/Schedule/MyTimerTask.java b/src/main/java/com/xydl/Schedule/MyTimerTask.java index bf3883c..46b21e1 100644 --- a/src/main/java/com/xydl/Schedule/MyTimerTask.java +++ b/src/main/java/com/xydl/Schedule/MyTimerTask.java @@ -1,23 +1,27 @@ -package com.xydl.Schedule; +package com.xydl.schedule; import com.xydl.service.impl.MqttServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.Timer; -import java.util.TimerTask; +@Component +@Slf4j public class MyTimerTask { - public static void main(String[] args) { - new Timer().schedule(new TimerTask() { - @Override - public void run() { - System.out.println("运行定时任务: "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - new MqttServiceImpl().reportRecord(); +// private static final Logger logger = LoggerFactory.getLogger(MyTimerTask.class); + @Autowired + MqttServiceImpl mqttServiceimpl; + + @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 + public void init() { + log.info("运行定时任务:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); + mqttServiceimpl.reportRecord(); - } - }, 1000, 3000); } } diff --git a/src/main/java/com/xydl/controller/TestController.java b/src/main/java/com/xydl/controller/TestController.java index d2b56f7..165d9dd 100644 --- a/src/main/java/com/xydl/controller/TestController.java +++ b/src/main/java/com/xydl/controller/TestController.java @@ -1,17 +1,20 @@ package com.xydl.controller; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller +@Slf4j public class TestController { +// private static final Logger logger = LoggerFactory.getLogger(TestController.class); - @RequestMapping("/eia") + @RequestMapping("/test") @ResponseBody public String test() { - System.out.println("==="); + log.info("==="); return "测试成功"; } diff --git a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java index 0877692..9165e1b 100644 --- a/src/main/java/com/xydl/service/impl/MqttServiceImpl.java +++ b/src/main/java/com/xydl/service/impl/MqttServiceImpl.java @@ -4,9 +4,7 @@ package com.xydl.service.impl; import com.xydl.mapper.OperationDB; import com.xydl.util.FormatUtil; import com.xydl.util.MqttUtil; -import com.xydl.util.Subscribe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; @@ -15,16 +13,18 @@ import java.util.stream.Collectors; @Service +@Slf4j public class MqttServiceImpl { - private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); +// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); @Autowired OperationDB operationDBMapper; -// @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 + @Autowired + MqttUtil mqttUtil; + + public void reportRecord() { - logger.info("开始执行"); - Subscribe.getInstance(); List allTableNames = operationDBMapper.getAllTable(); for (String tableName : allTableNames) { @@ -65,50 +65,24 @@ public class MqttServiceImpl { for (int deviceID : devIDLastTimeMap.keySet()) { String time = devIDLastTimeMap.get(deviceID).toString(); - System.out.println(time); String sql = preSQL+deviceID+midSQL+"'"+time+"'"+lastSQL; - System.out.println("sql :"+sql); List> dataOfoneDeviceID = operationDBMapper.getData(sql); String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap); - logger.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData); - if (new MqttUtil().publish2MQTT(jsonStringData)) { - operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID)); - logger.info("推送成功"); - } else { - logger.info("消息推送失败"); + log.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData); + try{ + if (mqttUtil.publish2MQTT(jsonStringData)) { + operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID)); + log.info("推送成功"); + } + }catch (Exception e){ + log.info("推送异常:{}",e.getMessage()); } + } } } -// @Scheduled(fixedDelay = Long.MAX_VALUE) // 用一个非常大的延迟值,确保只执行一次 -// public void subScribeSamle() { -// logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); -// MqttUtil.subScribeMQTT(); -// } - -// @Scheduled(fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务,使用fixedRate属性每隔固定时间执行 -// public void checkDevIdTimer() { -// logger.info("每小时检测一次同步的表是否在‘同步记录表’"); -// List allTableNames = getAllTableNameFromSyncTable(); -// for (String tableName : allTableNames) { -// if (!tableNameIfExitsSyncRec(tableName)) { -// logger.info("有不存在的表{},把所有的devId及最早的时间更新到'同步记录表'", tableName); -// List devIds = operationDBMapper.getAllDevId(tableName); -// for (String devId : devIds) { -// String earliestTime = null; -// if ("data_eaif_h".equals(tableName)) { -// earliestTime = operationDBMapper.getEarliestTime4Eaif(tableName, devId); -// } else { -// earliestTime = operationDBMapper.getEarliestTime4Other(tableName, devId); -// } -// addEarliestTime2SyncRecord(tableName, devId, earliestTime); -// } -// } -// } -// } - } diff --git a/src/main/java/com/xydl/util/CommonUtils.java b/src/main/java/com/xydl/util/CommonUtils.java deleted file mode 100644 index 58fcaf5..0000000 --- a/src/main/java/com/xydl/util/CommonUtils.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.xydl.util; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.sql.Array; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.text.SimpleDateFormat; -import java.util.*; - -public class CommonUtils { - private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class); - - private static final boolean languageZh = getPropertyValue("system.language", "zh").equals("zh"); - private static final ObjectMapper objectMapper = new ObjectMapper(); - - - - private static final long startupTime = System.currentTimeMillis(); - private static final int startingTime = Integer.parseInt(getPropertyValue("system.startup.time", "60000")); - - - - public static long getStartupTime() { - return startupTime; - } - - public static boolean started() { - return System.currentTimeMillis() - startupTime > startingTime; - } - - public static boolean isLanguageZh() { - return languageZh; - } - - - - - - public static ObjectMapper getObjectMapper() { - return objectMapper; - } - - - - - - - - - - - public static String getPropertyValue(String propertyName, String defaultValue) { - Properties properties = loadPropertyFile("sdwan.common.cfg"); - if (properties != null) { - String value = properties.getProperty(propertyName); - return value == null ? defaultValue : value; - } - return defaultValue; - } - - public static Properties loadPropertyFile(String fileName) { - String filePath = System.getProperty("user.dir").concat("/etc/").concat(fileName); - File file = new File(filePath); - if (!file.exists()) { - return null; - } - - try (InputStream is = new FileInputStream(file)) { - Properties properties = new Properties(); - properties.load(is); - return properties; - } catch (IOException e) { - logger.error("load property file exception:", e); - } - - return null; - } - - public static boolean setProperty(String fileName, String key, String value) { - Properties properties = loadPropertyFile(fileName); - if (properties == null) { - properties = new Properties(); - } - properties.setProperty(key, value); - return savePropertyFile(fileName, properties); - } - - public static boolean savePropertyFile(String fileName, Properties properties) { - String filePath = System.getProperty("user.dir").concat("/etc/").concat(fileName); - File file = new File(filePath); - if (!file.exists()) { - return false; - } - - try (OutputStream os = new FileOutputStream(file)) { - properties.store(os, null); - } catch (IOException e) { - logger.error("write file exception:", e); - return false; - } - - return true; - } - - - - - - @SuppressWarnings("unchecked") - public static List getListFromArray(Array array) { - if (array == null) { - return null; - } - try { - return Arrays.asList((T[]) array.getArray()); - } catch (SQLException ignored) { - } - - return null; - } - - - - - - - - public static String formatDate(Date date) { - return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date); - } - - - -// public static void main(String args[]) { -// System.out.println(CommonUtils.getLanguage()); -// } -} diff --git a/src/main/java/com/xydl/util/FormatUtil.java b/src/main/java/com/xydl/util/FormatUtil.java index c84f7cb..0063c76 100644 --- a/src/main/java/com/xydl/util/FormatUtil.java +++ b/src/main/java/com/xydl/util/FormatUtil.java @@ -45,24 +45,6 @@ public class FormatUtil { } -// private static List> transformFieldForList(Map fieldMap, List> oriFieldList) { -// List> newDeviceIDData = new ArrayList<>(); -// for(Map fieldValueMap : oriFieldList){ -// newDeviceIDData.add(transformOneRecord(fieldMap,fieldValueMap)); -// } -// return newDeviceIDData; -// } -// -// private static Map transformOneRecord(Map fieldMap, Map fieldValueMap) { -// Map newFieldValueMap = new HashMap<>(); -// for(String field : fieldMap.keySet()){ -// if(fieldValueMap.containsKey(field)){ -// newFieldValueMap.put(fieldMap.get(field),fieldValueMap.get(field) ); -// } -// } -// return newFieldValueMap; -// } - // public static void main(String[] args) { // System.out.println(Timestamp.valueOf("2022-05-01 12:11:00").getTime()); // System.out.println(new Timestamp(new Date(Timestamp.valueOf("2022-05-01 12:11:00").getTime()).getTime())); diff --git a/src/main/java/com/xydl/util/MqttUtil.java b/src/main/java/com/xydl/util/MqttUtil.java index b31bee3..38e3b7f 100644 --- a/src/main/java/com/xydl/util/MqttUtil.java +++ b/src/main/java/com/xydl/util/MqttUtil.java @@ -1,6 +1,7 @@ package com.xydl.util; import com.xydl.service.impl.MqttServiceImpl; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; @@ -12,21 +13,28 @@ import java.text.SimpleDateFormat; import java.util.Date; @Component +@Slf4j public class MqttUtil { - private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); +// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); @Value("${mqtt.broker}") private String broker; + @Value("${mqtt.topic}") private String topic; + @Value("${mqtt.username}") private String username; + @Value("${mqtt.password}") private String password; + @Value("${mqtt.publish.clientid}") private String publishClientid; + @Value("${mqtt.subscribe.clientid}") private String subscribeClientid; + @Value("${mqtt.qos}") private int qos; @@ -39,9 +47,10 @@ public class MqttUtil { // int qos = 0; // String username = "test"; // String password = "AliOS%1688"; -// String clientid = "publish_client"; + String clientid = "publish_client"; MqttClient client; MqttDeliveryToken token; +// System.out.println("broker: "+broker); try { client = new MqttClient(broker, publishClientid, new MemoryPersistence()); } catch (MqttException e) { @@ -66,14 +75,14 @@ public class MqttUtil { token = client.getTopic(topic).publish(message); token.waitForCompletion(); //打印发送状态 - logger.info("messagei is published completely! {}", token.isComplete()); + log.info("messagei is published completely! {}", token.isComplete()); // client.publish(topic, message); } catch (MqttException e) { throw new RuntimeException(e); } - logger.info("Message published"); - logger.info("topic: {}", topic); - logger.info("{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content); + log.info("Message published"); + log.info("topic: {}", topic); + log.info("{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content); // disconnect try { client.disconnect(); @@ -109,16 +118,16 @@ public class MqttUtil { // setup callback client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { - logger.info("connectionLost:{}", cause.getMessage()); + log.info("connectionLost:{}", cause.getMessage()); } public void messageArrived(String topic, MqttMessage message) { - logger.info("topic:{} ", topic); - logger.info("Qos:{} ", message.getQos()); - logger.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload())); + log.info("topic:{} ", topic); + log.info("Qos:{} ", message.getQos()); + log.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { - logger.info("deliveryComplete---------{}", token.isComplete()); + log.info("deliveryComplete---------{}", token.isComplete()); } }); client.connect(options); diff --git a/src/main/java/com/xydl/util/Subscribe.java b/src/main/java/com/xydl/util/Subscribe.java index d6eb9f9..00dc6a6 100644 --- a/src/main/java/com/xydl/util/Subscribe.java +++ b/src/main/java/com/xydl/util/Subscribe.java @@ -1,20 +1,30 @@ package com.xydl.util; import com.xydl.service.impl.MqttServiceImpl; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.text.SimpleDateFormat; import java.util.Date; +@Component +@Slf4j public class Subscribe { - private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); +// private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class); + + @Autowired + MqttUtil mqttUtil; private static Subscribe single = new Subscribe(); - private Subscribe(){ - logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - new MqttUtil().subScribeMQTT(); + @PostConstruct + private void SubscribeMqtt(){ + log.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); +// mqttUtil.subScribeMQTT(); } public static Subscribe getInstance(){ diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4178c61..fb3cbf3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -43,3 +43,20 @@ mqtt: password: AliOS%1688 qos: 0 +logging: + level: + root: info + com: + xydl: + controller: + TestControler: info + service: + impl: info + schedule: info + file: + name: E:\log\mqtt.log + + + + +