重构代码

main
wenhua.zhou 2 years ago
parent 65428cd265
commit f50b651ce2

@ -55,7 +55,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
<version>8.0.22</version>
</dependency>
<!-- 阿里巴巴JSON -->
@ -75,14 +75,14 @@
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.2</version>
<version>3.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
<version>1.1.21</version>
</dependency>
<!--数据连接池druid-->

@ -0,0 +1,23 @@
package com.xydl.Schedule;
import com.xydl.service.impl.MqttServiceImpl;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
public class MyTimerTask {
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("运行定时任务: "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
new MqttServiceImpl().reportRecord();
}
}, 1000, 3000);
}
}

@ -1,6 +1,5 @@
package com.xydl.mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
@ -23,10 +22,14 @@ public interface OperationDB {
String getSQL(String tableName);
Map<String, Object> getDeviceIDAndtime(String tableName);
List<Map<String, Object>> getDeviceIDAndtime(String tableName);
void addEarliestTime(int clientId, String tableName, String devId, String earliestTime);
List<Map<String, Object>> getData(String sql);
boolean updateSyncRecordsTable(String tableName, int deviceID, String time);

@ -1,25 +1,17 @@
package com.xydl.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.xydl.mapper.OperationDB;
import com.xydl.util.DataSourceUtils;
import com.xydl.util.FormatUtil;
import com.xydl.util.MqttUtil;
import com.xydl.util.Subscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
@ -29,304 +21,10 @@ public class MqttServiceImpl {
@Autowired
OperationDB operationDBMapper;
private static final String SYNC_TABLE = "sync_tables_info";
public List<String> getAllTableNameFromSyncTable() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<String> tableNames = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select table_name from sync_tables_info";
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
tableNames.add(rs.getString("table_name"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return tableNames;
}
public boolean tableNameIfExitsSyncRec(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "select devid_val from sync_records where table_name = ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
if (rs.next()) {
return true;
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return false;
}
public Map<String, String> getFieldMap(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
Map<String, String> fieldsMap = new HashMap<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select sync_fields_info.field_name, sync_fields_info.dest_field_name " +
"from sync_fields_info,sync_tables_info " +
"where sync_fields_info.client_id = 10 and sync_fields_info.table_name = sync_tables_info.table_name and sync_tables_info.table_name=?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
fieldsMap.put(rs.getString("field_name"), rs.getString("dest_field_name"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return fieldsMap;
}
public Map<String, Object> getDeviceIDAndtime(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sqlExecuting = null;
Map<String, Object> devIDTimeMap = new HashMap<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select devid_val,field_val2 from sync_records where table_name =?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while (rs.next()) {
devIDTimeMap.put(rs.getString("devid_val"), rs.getString("field_val2"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return devIDTimeMap;
}
public String getSQL(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sqlExecuting = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "select * from sync_tables_info where table_name =?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
if (rs.next()) {
sqlExecuting = rs.getString("sql");
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return sqlExecuting;
}
public List<Map<String, Object>> getData(String sqlExecuting, String deviceId, String time) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<Map<String, Object>> records = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = sqlExecuting;
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, deviceId);
pstmt.setString(2, time);
rs = pstmt.executeQuery();
int columnCount = rs.getMetaData().getColumnCount(); //获取列的数量
while (rs.next()) {
Map<String, Object> record = new HashMap<>();
for (int col = 0; col < columnCount; col++) {
String columnName = rs.getMetaData().getColumnName(col + 1);
Object columnValue = rs.getString(columnName);
record.put(columnName, columnValue);
}
records.add(record);
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return records;
}
public void addEarliestTime2SyncRecord(String tableName, String devId, String lastTime) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "insert into sync_records (client_id,table_name,devid_val,field_val2) values (?,?,?,?)";
pstmt = conn.prepareStatement(sql);
pstmt.setInt(1, 10);
pstmt.setString(2, tableName);
pstmt.setString(3, devId);
pstmt.setString(4, lastTime);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
}
public List<String> getAllDevId(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
String sqlExecuting = null;
List<String> devIDs = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select distinct eqmid from ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while (rs.next()) {
devIDs.add(rs.getString("eqmid"));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return devIDs;
}
public String getLastTime(String tableName, String devId) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "select d_time from ? where eqmid=? ORDER BY d_time asc limit 1";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
pstmt.setString(2, devId);
rs = pstmt.executeQuery();
if (rs.next()) {
return rs.getString("d_time");
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return null;
}
public List<Integer> getDataEqmids(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<Integer> eqmids = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select distinct eqmid from ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while (rs.next()) {
eqmids.add(Integer.valueOf(rs.getString("eqmid")));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return eqmids;
}
public List<Integer> getSyncRecordDevIds(String tableName) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
List<Integer> syncEqmids = new ArrayList<>();
try {
conn = DataSourceUtils.getConnection();
String sql = "select devid_val from sync_records where table_name = ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
rs = pstmt.executeQuery();
while (rs.next()) {
syncEqmids.add(Integer.valueOf(rs.getString("devid_val")));
}
} catch (SQLException e) {
logger.error("execute sql exception:", e);
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return syncEqmids;
}
public boolean updateSyncRecordsTable(String tableName, String deviceID, String time) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = DataSourceUtils.getConnection();
String sql = "update sync_records set field_val2 = ? where table_name = ? and devid_val = ?";
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, time);
pstmt.setString(2, tableName);
pstmt.setString(3, deviceID);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error("execute sql exception:", e);
return false;
} finally {
DataSourceUtils.closeResource(rs, pstmt, conn);
}
return true;
}
@Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
// @Scheduled(initialDelay = 1000, fixedRate = 1000 * 3600) //通过@Scheduled声明该方法是计划任务使用fixedRate属性每隔固定时间执行
public void reportRecord() {
logger.info("开始执行");
// Subscribe.getInstance();
Subscribe.getInstance();
List<String> allTableNames = operationDBMapper.getAllTable();
for (String tableName : allTableNames) {
@ -339,6 +37,9 @@ public class MqttServiceImpl {
}
String sqlExecuting = operationDBMapper.getSQL(tableName);
String preSQL = sqlExecuting.substring(0,sqlExecuting.indexOf("?"));
String midSQL = sqlExecuting.substring(sqlExecuting.indexOf("?")+1,sqlExecuting.lastIndexOf("?"));
String lastSQL = sqlExecuting.substring(sqlExecuting.lastIndexOf("?")+1);
List<Integer> dataEqmids = operationDBMapper.getDataEqmids(tableName);
List<Integer> syncDevIds = operationDBMapper.getSyncRecordDevIds(tableName);
if (dataEqmids.size() != syncDevIds.size()) {
@ -354,13 +55,24 @@ public class MqttServiceImpl {
}
}
Map<String, Object> devIDLastTimeMap = operationDBMapper.getDeviceIDAndtime(tableName);
for (String deviceID : devIDLastTimeMap.keySet()) {
List<Map<String, Object>> dataOfoneDeviceID = getData(sqlExecuting, deviceID, (String) devIDLastTimeMap.get(deviceID));
Map<Integer,Object> devIDLastTimeMap = new HashMap<>();
List<Map<String, Object>> devIDLastTimeMaps = operationDBMapper.getDeviceIDAndtime(tableName);
for(Map<String,Object> map : devIDLastTimeMaps){
for(String devId : map.keySet()){
devIDLastTimeMap.put(Integer.parseInt((String) map.get("devid_val")), map.get("field_val2"));
}
}
for (int deviceID : devIDLastTimeMap.keySet()) {
String time = devIDLastTimeMap.get(deviceID).toString();
System.out.println(time);
String sql = preSQL+deviceID+midSQL+"'"+time+"'"+lastSQL;
System.out.println("sql :"+sql);
List<Map<String, Object>> dataOfoneDeviceID = operationDBMapper.getData(sql);
String jsonStringData = FormatUtil.mqttFormatTransform(dataOfoneDeviceID, fieldMap);
logger.info("表{}设备{}推送数据:{}", tableName, deviceID, jsonStringData);
if (MqttUtil.publish2MQTT(jsonStringData)) {
updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
if (new MqttUtil().publish2MQTT(jsonStringData)) {
operationDBMapper.updateSyncRecordsTable(tableName, deviceID, (String) devIDLastTimeMap.get(deviceID));
logger.info("推送成功");
} else {
logger.info("消息推送失败");

@ -1,91 +0,0 @@
package com.xydl.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.util.Properties;
public class DataSourceUtils {
private static final Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);
private static DataSource dataSource;
private static String url;
static {
initDataSource();
}
private static void initDataSource() {
Properties properties = new Properties();
properties.setProperty("driverClassName", "com.mysql.cj.jdbc.Driver");
properties.setProperty("url", "jdbc:mysql://localhost:3306/cac");
properties.setProperty("username", "root");
properties.setProperty("password", "root");
try {
dataSource = DruidDataSourceFactory.createDataSource(properties);
url = ((DruidDataSource) dataSource).getUrl();
} catch (Exception e) {
logger.error("init dataSource exception:", e);
}
logger.info("Data source has been initialized successfully!");
}
// 提供获取连接的方法
public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
// 提供关闭资源的方法【connection是归还到连接池】
// 提供关闭资源的方法 【方法重载】3 dql
public static void closeResource(ResultSet resultSet, Statement statement, Connection connection) {
// 关闭结果集
closeResultSet(resultSet);
// 关闭语句执行者
closeStatement(statement);
// 关闭连接
closeConnection(connection);
}
// 提供关闭资源的方法 【方法重载】 2 dml
public static void closeResource(Statement statement, Connection connection) {
// 关闭语句执行者
closeStatement(statement);
// 关闭连接
closeConnection(connection);
}
private static void closeConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("closeConnection exception", e);
}
}
}
private static void closeStatement(Statement statement) {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
logger.error("closeStatement exception", e);
}
}
}
private static void closeResultSet(ResultSet resultSet) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
logger.error("closeResultSet exception", e);
}
}
}
}

@ -38,7 +38,7 @@ public class FormatUtil {
}
mqttRecord.put("AssetCode",record.get("sensorid"));
mqttRecord.put("AttributeList",attribuiteList);
String captureTime = record.get("d_time") !=null ? (String) record.get("d_time") : (String) record.get("capturetime");
String captureTime = record.get("d_time") !=null ? record.get("d_time").toString() : (String) record.get("capturetime").toString();
mqttRecord.put("Timestamp",Timestamp.valueOf(captureTime).getTime());
return mqttRecord;

@ -1,25 +1,49 @@
package com.xydl.util;
import com.xydl.service.impl.MqttServiceImpl;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class MqttUtil {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
public static boolean publish2MQTT(String content){
String broker = "tcp://139.196.211.222:10883";
String topic = "mqtt/test";
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.topic}")
private String topic;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.publish.clientid}")
private String publishClientid;
@Value("${mqtt.subscribe.clientid}")
private String subscribeClientid;
@Value("${mqtt.qos}")
private int qos;
public boolean publish2MQTT(String content){
// String broker = "tcp://139.196.211.222:10883";
// String topic = "mqtt/test";
// String content = "Hello MQTT";
int qos = 0;
String username = "test";
String password = "AliOS%1688";
String clientid = "publish_client";
// int qos = 0;
// String username = "test";
// String password = "AliOS%1688";
// String clientid = "publish_client";
MqttClient client;
MqttDeliveryToken token;
try {
client = new MqttClient(broker, clientid, new MemoryPersistence());
client = new MqttClient(broker, publishClientid, new MemoryPersistence());
} catch (MqttException e) {
throw new RuntimeException(e);
}
@ -42,14 +66,14 @@ public class MqttUtil {
token = client.getTopic(topic).publish(message);
token.waitForCompletion();
//打印发送状态
System.out.println("messagei is published completely!" + token.isComplete());
logger.info("messagei is published completely! {}", token.isComplete());
// client.publish(topic, message);
} catch (MqttException e) {
throw new RuntimeException(e);
}
System.out.println("Message published");
System.out.println("topic: " + topic);
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content);
logger.info("Message published");
logger.info("topic: {}", topic);
logger.info("{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " message content推送: " + content);
// disconnect
try {
client.disconnect();
@ -66,15 +90,16 @@ public class MqttUtil {
return token.isComplete();
}
public static void subScribeMQTT(){
String broker = "tcp://139.196.211.222:10883";
String topic = "mqtt/test";
String username = "test";
String password = "AliOS%1688";
String clientid = "subscribe_client";
int qos = 0;
public void subScribeMQTT(){
// String broker = "tcp://139.196.211.222:10883";
// String topic = "mqtt/test";
// String username = "test";
// String password = "AliOS%1688";
// String clientid = "subscribe_client";
// int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttClient client = new MqttClient(broker, subscribeClientid, new MemoryPersistence());
// connect options
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
@ -84,16 +109,16 @@ public class MqttUtil {
// setup callback
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
logger.info("connectionLost:{}", cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload()));
logger.info("topic:{} ", topic);
logger.info("Qos:{} ", message.getQos());
logger.info("{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+" message content接收: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
logger.info("deliveryComplete---------{}", token.isComplete());
}
});
client.connect(options);

@ -9,11 +9,12 @@ import java.util.Date;
public class Subscribe {
private static final Logger logger = LoggerFactory.getLogger(MqttServiceImpl.class);
private static Subscribe single = new Subscribe();
private Subscribe(){
logger.info("开始订阅===subScribe执行一次==={}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
MqttUtil.subScribeMQTT();
new MqttUtil().subScribeMQTT();
}
public static Subscribe getInstance(){

@ -30,4 +30,16 @@ mybatis:
#指定mybatis配置文件的位置
#config-location: classpath:mybatis/mybatis-config.xml
#指定映射文件的位置
mapper-locations: classpath:mybatis/mapper/*.xml
mapper-locations: classpath:mybatis/mapper/*.xml
mqtt:
publish:
clientid: publish_client
subscribe:
clientid: subscribe_client
broker: tcp://139.196.211.222:10883
topic: mqtt/test
username: test
password: AliOS%1688
qos: 0

@ -31,7 +31,7 @@
</select>
<select id="getSQL" parameterType="String" resultType="String">
select sql from sync_tables_info where table_name = #{tableName}
select `sql` from sync_tables_info where table_name = #{tableName}
</select>
<select id="getDeviceIDAndtime" parameterType="String" resultType="Map">
@ -42,6 +42,14 @@
insert into sync_records (client_id,table_name,devid_val,field_val2) values (#{clientId},#{tableName},#{devId},#{earliestTime})
</insert>
<select id="getData" parameterType="String" resultType="Map">
${sql}
</select>
<update id="updateSyncRecordsTable" parameterType="String" >
update sync_records set field_val2 = #{time} where table_name = #{tableName} and devid_val = #{deviceID}
</update>

Loading…
Cancel
Save