feat: mqtt重新开发

main
huangfeng 1 year ago
parent abe4383222
commit 062b038478

@ -3,108 +3,114 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xydl</groupId>
<artifactId>mqtt</artifactId>
<version>1.0.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/>
<version>2.7.17</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xydl</groupId>
<artifactId>mqtt</artifactId>
<version>1.0.0</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.4.0</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- MQTT依赖 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
<!--mysql驱动依赖-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>
<!-- 阿里巴巴JSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31_noneautotype</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.20</version>
</dependency>
<!--mybatis-plus相关依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.0</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.14.0</version>
</dependency>
<!--数据连接池druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>
<!-- MQTT依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
<!--mybatis-plus相关依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <version>2.3.2</version>-->
<!-- <configuration>-->
<!-- <source>1.8</source>-->
<!-- <target>1.8</target>-->
<!-- </configuration>-->
<!-- </plugin>-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -17,16 +17,16 @@ CREATE TABLE `sync_tables_info` (
-- Records of sync_tables_info
-- ----------------------------
INSERT INTO `sync_tables_info` VALUES (10, 'data_eaif_h', 'SELECT t2.equipmentid , t2.sensorid, DATE_FORMAT(t1.capturetime, ''%Y-%m-%d %H:%i:%s'') as capturetime,t1.maxtemp,t1.mintemp,t1.avgtemp,t2.`Phase` FROM `data_eaif_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.capturetime>%%KEYVALUE%% ORDER BY t1.capturetime LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-11 16:17:33');
INSERT INTO `sync_tables_info` VALUES (10, 'data_eia_h', 'SELECT t2.equipmentid, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.d_ct_1, t2.phase FROM `data_eia_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-11 16:17:38');
INSERT INTO `sync_tables_info` VALUES (10, 'data_epa_h', 'SELECT t2.equipmentid, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time,t1.h2ppm,t1.ch4ppm,t1.c2h6ppm,t1.c2h4ppm,t1.c2h2ppm,t1.coppm,t1.co2ppm,t1.o2ppm,t1.n2ppm,t1.totalhydrocarbon,t1.gaspress,t1.h2oppm,t2.`Phase` FROM `data_epa_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:19:56');
INSERT INTO `sync_tables_info` VALUES (10, 'data_etp_h', 'SELECT t2.equipmentid, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time,t1.t1 FROM `data_etp_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:19:33');
INSERT INTO `sync_tables_info` VALUES (10, 'data_microclimate_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.envtmp, t1.envpres, t1.envhum, t1.rnfll, t2.phase FROM `data_microclimate_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:40:54');
INSERT INTO `sync_tables_info` VALUES (10, 'data_moa_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.pt1, t1.lc1, t1.rc1, t1.ligcnt1, t1.lastligtm1, t2.phase FROM `data_moa_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:43:47');
INSERT INTO `sync_tables_info` VALUES (10, 'data_pd_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.waveform, t1.plsnum, t1.apppadsch, t2.phase FROM `data_pd_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:40:29');
INSERT INTO `sync_tables_info` VALUES (10, 'data_scur_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.current_val, t2.phase FROM `data_scur_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:44:23');
INSERT INTO `sync_tables_info` VALUES (10, 'data_sf6env_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.gas1, t1.yq1, t1.md1, t1.pm1, t1.gascnt1, t1.hmcnt1,t1.sf6warn1, t1.o2warn1,t2.phase FROM `data_sf6env_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:42:33');
INSERT INTO `sync_tables_info` VALUES (10, 'data_sf6_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.temp1, t1.md1, t1.pressure1+900, t1.pressure1, t1.pm1, t2.phase FROM `data_sf6_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-13 09:42:12');
INSERT INTO `sync_tables_info` VALUES (10, 'data_eaif_h', 'SELECT t2.equipmentid , t2.sensorid, DATE_FORMAT(t1.capturetime, ''%Y-%m-%d %H:%i:%s'') as capturetime,t1.maxtemp,t1.mintemp,t1.avgtemp,t2.`Phase` FROM `data_eaif_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.capturetime>%%KEYVALUE%% ORDER BY t1.capturetime LIMIT 1000', 'eqmid', 'sensorid', 'capturetime', 3, '2023-12-11 16:17:33');
INSERT INTO `sync_tables_info` VALUES (10, 'data_eia_h', 'SELECT t2.equipmentid, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.d_ct_1, t2.phase FROM `data_eia_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-11 16:17:38');
INSERT INTO `sync_tables_info` VALUES (10, 'data_epa_h', 'SELECT t2.equipmentid, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time,t1.h2ppm,t1.ch4ppm,t1.c2h6ppm,t1.c2h4ppm,t1.c2h2ppm,t1.coppm,t1.co2ppm,t1.o2ppm,t1.n2ppm,t1.totalhydrocarbon,t1.gaspress,t1.h2oppm,t2.`Phase` FROM `data_epa_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:19:56');
INSERT INTO `sync_tables_info` VALUES (10, 'data_etp_h', 'SELECT t2.equipmentid, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time,t1.t1 FROM `data_etp_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:19:33');
INSERT INTO `sync_tables_info` VALUES (10, 'data_microclimate_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.envtmp, t1.envpres, t1.envhum, t1.rnfll, t2.phase FROM `data_microclimate_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:40:54');
INSERT INTO `sync_tables_info` VALUES (10, 'data_moa_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.pt1, t1.lc1, t1.rc1, t1.ligcnt1, t1.lastligtm1, t2.phase FROM `data_moa_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:43:47');
INSERT INTO `sync_tables_info` VALUES (10, 'data_pd_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.waveform, t1.plsnum, t1.apppadsch, t2.phase FROM `data_pd_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:40:29');
INSERT INTO `sync_tables_info` VALUES (10, 'data_scur_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.current_val, t2.phase FROM `data_scur_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:44:23');
INSERT INTO `sync_tables_info` VALUES (10, 'data_sf6env_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.gas1, t1.yq1, t1.md1, t1.pm1, t1.gascnt1, t1.hmcnt1,t1.sf6warn1, t1.o2warn1,t2.phase FROM `data_sf6env_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:42:33');
INSERT INTO `sync_tables_info` VALUES (10, 'data_sf6_h', 'SELECT t2.equipmentid AS SubDeviceID, t2.sensorid, DATE_FORMAT(t1.d_time, ''%Y-%m-%d %H:%i:%s'') as d_time, t1.temp1, t1.md1, t1.pressure1+900, t1.pressure1, t1.pm1, t2.phase FROM `data_sf6_h` AS t1 JOIN i2relation AS t2 ON t1.eqmid=t2.eqmid WHERE t1.eqmid=%%DEVID%% AND t1.d_time>%%KEYVALUE%% ORDER BY t1.d_time LIMIT 1000', 'eqmid', 'sensorid', 'd_time', 3, '2023-12-13 09:42:12');
CREATE TABLE `sync_fields_info` (

@ -1,24 +1,20 @@
package com.xydl;
import com.xydl.service.impl.MqttServiceImpl;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
@EnableSwagger2
@EnableScheduling
@MapperScan(basePackages = "com.xydl.mapper")
@SpringBootApplication//标识该类为主程序启动类
@SpringBootApplication
public class MqttApplication {
//主程序启动方法
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class,args);
SpringApplication.run(MqttApplication.class, args);
}
}

@ -0,0 +1,45 @@
package com.xydl.entity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "i2relation")
@ApiModel("i2relation表")
public class I2relation {
@Id
@Column(name = "sensorid")
private String sensorid;
@Column(name = "eqmid")
private Integer eqmid;
@Column(name = "cacid")
private String cacid;
@Column(name = "equipmentid")
private String equipmentid;
@Column(name = "monitortype")
private String monitortype;
@Column(name = "phase")
private String phase;
@Column(name = "sensorindex")
private Integer sensorindex;
}

@ -0,0 +1,43 @@
package com.xydl.entity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "sync_fields_info")
@ApiModel("sync_fields_info表")
public class SyncFieldsInfo {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id")
private Integer id;
@Column(name = "client_id")
private Integer clientId;
@Column(name = "table_name")
private String tableName;
@Column(name = "field_name")
private String fieldName;
@Column(name = "field_type")
private Byte fieldType;
@Column(name = "dest_field_name")
private String destFieldName;
@Column(name = "sensorindex")
private Integer sensorindex;
}

@ -0,0 +1,33 @@
package com.xydl.entity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "sync_records")
@ApiModel("sync_records表")
public class SyncRecords {
@EmbeddedId
private SyncRecordsKey id;
@Column(name = "field_val1")
private Long fieldVal1;
@Column(name = "field_val2")
private Date fieldVal2;
@Column(name = "update_time")
private Date updateTime;
}

@ -0,0 +1,31 @@
package com.xydl.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Embeddable;
import javax.persistence.Id;
import java.io.Serializable;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Embeddable
public class SyncRecordsKey implements Serializable {
@Column(name = "client_id")
private Integer clientId;
@Column(name = "table_name")
private String tableName;
@Column(name = "devid_val")
private String devidVal;
}

@ -0,0 +1,42 @@
package com.xydl.entity;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "sync_tables_info")
@ApiModel("sync_tables_info表")
public class SyncTablesInfo {
@EmbeddedId
private SyncTablesInfoKey id;
@Column(name = "sql")
private String sql;
@Column(name = "devid_field_name")
private String devidFieldName;
@Column(name = "outer_devid_fname")
private String outerDevidFname;
@Column(name = "field_name")
private String fieldName;
@Column(name = "field_type")
private Byte fieldType;
@Column(name = "update_time")
private Date updateTime;
}

@ -0,0 +1,28 @@
package com.xydl.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Embeddable;
import javax.persistence.Id;
import java.io.Serializable;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Embeddable
public class SyncTablesInfoKey implements Serializable {
@Column(name = "client_id")
private Integer clientId;
@Column(name = "table_name")
private String tableName;
}

@ -0,0 +1,39 @@
package com.xydl.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xydl.entity.I2relation;
import com.xydl.entity.SyncFieldsInfo;
import lombok.Data;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Data
public class AssetItem {
@JsonProperty("AssetCode")
String assetCode;
@JsonProperty("AttributeList")
List<AttributeItem> attributeList;
@JsonProperty("Timestamp")
Long timestamp;
public static AssetItem fromData(Map<String, Object> map, String timeField,
I2relation relation, List<SyncFieldsInfo> fieldList) {
AssetItem root = new AssetItem();
root.setAssetCode(relation.getSensorid());
List<AttributeItem> list = new ArrayList<>();
for (SyncFieldsInfo field : fieldList) {
if (relation.getSensorindex().equals(field.getSensorindex())) {
AttributeItem item = AttributeItem.fromData(map, field);
list.add(item);
}
}
root.setAttributeList(list);
LocalDateTime time = (LocalDateTime) map.get(timeField);
root.setTimestamp(time.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return root;
}
}

@ -0,0 +1,33 @@
package com.xydl.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xydl.entity.I2relation;
import com.xydl.entity.SyncFieldsInfo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Data
public class AssetList {
@JsonProperty("AssetList")
List<AssetItem> assetList;
public static AssetList fromData(List<Map<String, Object>> dataList, String timeField,
List<I2relation> relationList, List<SyncFieldsInfo> fieldList) {
AssetList root = new AssetList();
List<AssetItem> list = new ArrayList<>();
if (relationList.size() > 1){
int a = 1;
}
for (Map<String, Object> map : dataList) {
for (I2relation relation : relationList) {
AssetItem item = AssetItem.fromData(map, timeField, relation, fieldList);
list.add(item);
}
}
root.setAssetList(list);
return root;
}
}

@ -0,0 +1,22 @@
package com.xydl.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xydl.entity.SyncFieldsInfo;
import lombok.Data;
import java.util.Map;
@Data
public class AttributeItem {
@JsonProperty("AttributeCode")
String attributeCode;
@JsonProperty("DataValue")
Object dataValue;
public static AttributeItem fromData(Map<String, Object> map, SyncFieldsInfo field) {
AttributeItem root = new AttributeItem();
root.setAttributeCode(field.getDestFieldName());
root.setDataValue(map.get(field.getFieldName()));
return root;
}
}

@ -0,0 +1,15 @@
package com.xydl.repository;
import com.xydl.entity.I2relation;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface I2relationRepository extends JpaRepository<I2relation, String>, JpaSpecificationExecutor<I2relation> {
List<I2relation> findAllByEqmid(Integer eqmid);
}

@ -0,0 +1,15 @@
package com.xydl.repository;
import com.xydl.entity.SyncFieldsInfo;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface SyncFieldsInfoRepository extends JpaRepository<SyncFieldsInfo, Integer>, JpaSpecificationExecutor<SyncFieldsInfo> {
List<SyncFieldsInfo> findAllByClientIdAndTableName(Integer clientId, String tableName);
}

@ -0,0 +1,13 @@
package com.xydl.repository;
import com.xydl.entity.SyncRecords;
import com.xydl.entity.SyncRecordsKey;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
@Repository
public interface SyncRecordsRepository extends JpaRepository<SyncRecords, SyncRecordsKey>, JpaSpecificationExecutor<SyncRecords> {
}

@ -0,0 +1,13 @@
package com.xydl.repository;
import com.xydl.entity.SyncTablesInfo;
import com.xydl.entity.SyncTablesInfoKey;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
@Repository
public interface SyncTablesInfoRepository extends JpaRepository<SyncTablesInfo, SyncTablesInfoKey>, JpaSpecificationExecutor<SyncTablesInfo> {
}

@ -1,24 +1,29 @@
package com.xydl.schedule;
import com.xydl.service.MqttService;
import com.xydl.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class ReportSchedule {
@Autowired
MqttServiceImpl mqttServiceimpl;
@Resource
MqttService mqttService;
@Scheduled(initialDelay = 1000, fixedDelay = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void reportRecord() {
log.info("运行定时转发任务");
mqttServiceimpl.reportRecord();
// mqttServiceimpl.reportRecord();
mqttService.uploadTask();
}
}

@ -0,0 +1,43 @@
package com.xydl.service;
import com.xydl.entity.SyncTablesInfo;
import com.xydl.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class DbsqlService {
@Resource
private JdbcTemplate jdbcTemplate;
public List<Integer> getDistinctEqmids(String tableName, String field) {
String sql = "SELECT DISTINCT(" + field + ") FROM " + tableName;
List<Integer> list = jdbcTemplate.queryForList(sql, Integer.class);
return list;
}
public List<Map<String, Object>> getData(Integer eqmid, Date lasttime, SyncTablesInfo table) {
String sqlSelect = "SELECT * FROM " + table.getId().getTableName();
String sqlWhere = " WHERE " + table.getDevidFieldName() + "=" + eqmid;
if (lasttime != null) {
sqlWhere = sqlWhere + " AND " + table.getFieldName() + ">'" + DateUtil.format(lasttime) + "'";
}
String sqlOrder = " ORDER BY " + table.getFieldName() + " ASC LIMIT 1000";
String sql = sqlSelect + sqlWhere + sqlOrder;
// log.debug(sql);
List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
return list;
}
}

@ -0,0 +1,22 @@
package com.xydl.service;
import com.xydl.entity.I2relation;
import com.xydl.repository.I2relationRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class I2relationService {
@Resource
I2relationRepository repository;
public List<I2relation> getList(Integer eqmid) {
return repository.findAllByEqmid(eqmid);
}
}

@ -0,0 +1,130 @@
package com.xydl.service;
import com.xydl.entity.*;
import com.xydl.model.AssetList;
import com.xydl.util.DateUtil;
import com.xydl.util.JSONUtil;
import com.xydl.util.MqttUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
@Service
@Slf4j
public class MqttService {
@Resource
MqttUtil mqttUtil;
@Resource
I2relationService i2relationService;
@Resource
SyncTablesInfoService tablesInfoService;
@Resource
SyncFieldsInfoService fieldsInfoService;
@Resource
SyncRecordsService recordsService;
@Resource
DbsqlService dbsqlService;
// 上传任务
public void uploadTask() {
log.info("转发任务开始.");
List<SyncTablesInfo> tableList = tablesInfoService.findAll();
for (SyncTablesInfo table : tableList) {
try {
processOneTable(table);
} catch (Exception e) {
log.error("processOneTable exception: " + table.getId().getTableName(), e);
}
}
log.info("转发任务结束.");
}
// 处理单个表
private void processOneTable(SyncTablesInfo table) throws Exception {
List<SyncFieldsInfo> fieldList = fieldsInfoService.findAll(table.getId().getClientId(), table.getId().getTableName());
if (!CollectionUtils.isEmpty(fieldList)) {
List<Integer> eqmidList = dbsqlService.getDistinctEqmids(table.getId().getTableName(), table.getDevidFieldName());
log.info("表{},共{}个设备", table.getId().getTableName(), eqmidList.size());
if (!CollectionUtils.isEmpty(eqmidList)) {
mqttUtil.connect();
for (Integer eqmid : eqmidList) {
this.processOneEqmid(eqmid, table, fieldList);
}
mqttUtil.disconnect();
}
} else {
log.error("表{}未配置sync_fields_info", table.getId().getTableName());
}
}
// 处理单个设备
private void processOneEqmid(Integer eqmid, SyncTablesInfo table, List<SyncFieldsInfo> fieldList) {
List<I2relation> relationList = i2relationService.getList(eqmid);
if (!CollectionUtils.isEmpty(relationList)) {
try {
int count = this.uploadOneBlock(eqmid, null,
table, fieldList, relationList);
if (count > 1000) {
log.debug("表{}设备{}一共推送成功{}条数据.", table.getId().getTableName(), eqmid, count);
}
} catch (Exception e) {
log.error("表{}设备{}推送异常", table.getId().getTableName(), eqmid, e);
}
} else {
log.warn("表{}设备{}未配置i2relation", table.getId().getTableName(), eqmid);
}
}
// 查询并上传单个设备的数据
private int uploadOneBlock(Integer eqmid, SyncRecords record,
SyncTablesInfo table, List<SyncFieldsInfo> fieldList, List<I2relation> relationList) throws Exception {
if (record == null) {
Optional<SyncRecords> optional = recordsService.findByKey(table.getId().getClientId(), table.getId().getTableName(), String.valueOf(eqmid));
if (optional.isPresent()) {
record = optional.get();
} else {
SyncRecordsKey key = SyncRecordsKey.builder()
.clientId(table.getId().getClientId())
.tableName(table.getId().getTableName())
.devidVal(String.valueOf(eqmid))
.build();
record = SyncRecords.builder()
.id(key)
.build();
}
}
int count = 0;
List<Map<String, Object>> dataList = dbsqlService.getData(eqmid, record.getFieldVal2(), table);
if (CollectionUtils.isEmpty(dataList)) {
return 0;
} else {
count = dataList.size();
AssetList assetList = AssetList.fromData(dataList, table.getFieldName(), relationList, fieldList);
String json = JSONUtil.object2Json(assetList);
mqttUtil.publish2MQTT(json);
Map<String, Object> lastMap = dataList.get(count - 1);
LocalDateTime locdate = (LocalDateTime) lastMap.get(table.getFieldName());
Date date = Date.from(locdate.atZone(ZoneId.systemDefault()).toInstant());
record.setFieldVal2(date);
recordsService.update(record);
log.debug("表{}设备{}推送成功{}条数据,最后时间{}", table.getId().getTableName(), eqmid, count, DateUtil.format(date));
if (count >= 1000) {
count += uploadOneBlock(eqmid, record, table, fieldList, relationList);
}
return count;
}
}
}

@ -0,0 +1,23 @@
package com.xydl.service;
import com.xydl.entity.SyncFieldsInfo;
import com.xydl.repository.SyncFieldsInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class SyncFieldsInfoService {
@Resource
SyncFieldsInfoRepository repository;
public List<SyncFieldsInfo> findAll(Integer clientId, String tableName) {
return repository.findAllByClientIdAndTableName(clientId, tableName);
}
}

@ -0,0 +1,32 @@
package com.xydl.service;
import com.xydl.entity.SyncRecords;
import com.xydl.entity.SyncRecordsKey;
import com.xydl.repository.SyncRecordsRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Optional;
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class SyncRecordsService {
@Resource
SyncRecordsRepository repository;
public Optional<SyncRecords> findByKey(Integer clientId, String tableName, String devid) {
SyncRecordsKey key = SyncRecordsKey.builder()
.clientId(clientId)
.tableName(tableName)
.devidVal(devid)
.build();
return repository.findById(key);
}
public void update(SyncRecords record) {
repository.save(record);
}
}

@ -0,0 +1,32 @@
package com.xydl.service;
import com.xydl.entity.SyncTablesInfo;
import com.xydl.entity.SyncTablesInfoKey;
import com.xydl.repository.SyncTablesInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class SyncTablesInfoService {
@Resource
SyncTablesInfoRepository repository;
public List<SyncTablesInfo> findAll() {
return repository.findAll();
}
public Optional<SyncTablesInfo> findByKey(Integer clientId, String tableName) {
SyncTablesInfoKey key = SyncTablesInfoKey.builder()
.clientId(clientId)
.tableName(tableName)
.build();
return repository.findById(key);
}
}

@ -0,0 +1,83 @@
package com.xydl.util;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class DateUtil {
public final static String defaultDatePattern = "yyyy-MM-dd HH:mm:ss";
/**
* date pattern
*/
public static String getDatePattern() {
return defaultDatePattern;
}
/**
* 使FormatDate
*/
public static String format(Date date) {
return format(date, getDatePattern());
}
/**
* 使FormatDate
*/
public static String format(Date date, String pattern) {
String returnValue = "";
if (date != null) {
SimpleDateFormat df = new SimpleDateFormat(pattern);
returnValue = df.format(date);
}
return (returnValue);
}
/**
* 使Date
*/
public static Date parse(String strDate) throws ParseException {
return parse(strDate, getDatePattern());
}
/**
* 使FormatDate
*/
public static Date parse(String strDate, String pattern) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat(pattern);
return df.parse(strDate);
}
public static Date getTodayZero() throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
String str = df.format(new Date());
return df.parse(str);
}
public static Date getMonthZero() throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM");
String str = df.format(new Date());
return df.parse(str);
}
/**
* (n)
*/
public static Date addDay(Date date, int n) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.DAY_OF_MONTH, n);
return cal.getTime();
}
public static Date addMonth(Date date, int n) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.MONTH, n);
return cal.getTime();
}
}

@ -0,0 +1,87 @@
package com.xydl.util;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class JSONProcessor {
public Map<String, Object> fromJSON2Map(String json) throws JsonParseException, JsonMappingException, IOException {
ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked")
Map<String, Object> productMap = mapper.readValue(json, Map.class);
return productMap;
}
public Map<String, Object> fromJSON2Map(Object obj) throws JsonParseException, JsonMappingException, IOException {
ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked")
Map<String, Object> productMap = mapper.convertValue(obj, Map.class);
return productMap;
}
public String buildJSONFromJSONObject(Object obj) {
String jsonInString = null;
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
mapper.setSerializationInclusion(Include.NON_NULL);
jsonInString = mapper.writeValueAsString(obj);
} catch (Exception e) {
// LOG.error("JSON transform failed. ", e);
}
return jsonInString;
}
public String buildJSONFromJSONObject(Object obj, boolean prettyPrinter) {
String jsonInString = null;
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
mapper.setSerializationInclusion(Include.NON_NULL);
if (prettyPrinter) {
jsonInString = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
} else {
jsonInString = mapper.writeValueAsString(obj);
}
} catch (Exception e) {
// LOG.error("JSON transform failed. ", e);
}
return jsonInString;
}
public <T> T buildJSONObjectFromJSON(String json, Class<T> clazz) throws Exception {
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper.readValue(json, clazz);
} catch (UnrecognizedPropertyException e) {
throw new Exception(e.getPropertyName(), e);
} catch (Exception e) {
throw new Exception("JSON Object transform failed. ", e);
}
}
public <T> T buildJSONObjectFromJSON(File file, Class<T> clazz) throws Exception {
ObjectMapper mapper = new ObjectMapper();
try {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper.readValue(file, clazz);
} catch (UnrecognizedPropertyException e) {
throw new Exception(e.getPropertyName(), e);
} catch (Exception e) {
throw new Exception("JSON Object transform failed. ", e);
}
}
}

@ -0,0 +1,43 @@
/**
* @author roger - Sep 9, 2016
* @version 2.0
* file name: JSONProcessor.java
* package name: com.roam2free.rest.util
*/
package com.xydl.util;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class JSONUtil {
private static JSONProcessor jSONProcessor = new JSONProcessor();
public static String object2Json(Object obj) {
return jSONProcessor.buildJSONFromJSONObject(obj);
}
public static String object2Json(Object obj, boolean prettyPrinter) {
return jSONProcessor.buildJSONFromJSONObject(obj, prettyPrinter);
}
public static <T> T json2Object(String json, Class<T> clazz) throws Exception {
return jSONProcessor.buildJSONObjectFromJSON(json, clazz);
}
public static <T> T file2Object(File file, Class<T> clazz) throws Exception {
return jSONProcessor.buildJSONObjectFromJSON(file, clazz);
}
public static Map<String, Object> object2Map(Object obj) {
try {
return jSONProcessor.fromJSON2Map(obj);
} catch (IOException e) {
return null;
}
}
}

@ -4,11 +4,21 @@ server:
#数据源配置
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.50.200:3306/mqttcac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000
url: jdbc:mysql://192.168.50.200:3306/cacdb_fuxian?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000
username: root
password: 123456
jpa:
show-sql: false
mvc:
format:
date: yyyy-MM-dd HH:mm:ss
pathmatch:
matching-strategy: ant_path_matcher
mybatis:
@ -31,14 +41,9 @@ logging:
level:
root: info
com:
xydl:
controller:
TestControler: info
service:
impl: debug
schedule: info
xydl: debug
file:
name: /root/log/mqtt.log
name: /home/xydl/mqtt/logs/mqtt.log

@ -4,11 +4,21 @@ server:
#数据源配置
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/cac?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&connectTimeout=60000&socketTimeout=60000
username: root
password: root
jpa:
show-sql: false
mvc:
format:
date: yyyy-MM-dd HH:mm:ss
pathmatch:
matching-strategy: ant_path_matcher
mybatis:
@ -31,14 +41,9 @@ logging:
level:
root: info
com:
xydl:
controller:
TestControler: info
service:
impl: debug
schedule: info
xydl: debug
file:
name: /root/log/mqtt.log
name: /home/xydl/mqtt/logs/mqtt.log

Loading…
Cancel
Save