feat: 增加基础mqtt收发功能
parent
ec59f62a49
commit
a486aa1668
@ -0,0 +1,43 @@
|
||||
package com.shxy.xymanager_framework.config;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class MqttConfig {
|
||||
|
||||
@Value("${mqtt.broker}")
|
||||
private String broker;
|
||||
|
||||
@Value("${mqtt.clientId}")
|
||||
private String clientId;
|
||||
|
||||
@Value("${mqtt.username}")
|
||||
private String username;
|
||||
|
||||
@Value("${mqtt.password}")
|
||||
private String password;
|
||||
|
||||
@Bean
|
||||
public MqttConnectOptions mqttConnectOptions() {
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setServerURIs(new String[]{broker});
|
||||
options.setUserName(username);
|
||||
options.setPassword(password.toCharArray());
|
||||
options.setAutomaticReconnect(true);
|
||||
options.setCleanSession(true);
|
||||
return options;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MqttClient mqttClient(MqttConnectOptions options) throws MqttException {
|
||||
MqttClient client = new MqttClient(broker, clientId);
|
||||
client.connect(options);
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,26 @@
|
||||
package com.shxy.xymanager_framework.mqtt;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Service
|
||||
public class MqttPublisherService {
|
||||
|
||||
@Value("${mqtt.cmdtopic}")
|
||||
private String cmdtopic;
|
||||
|
||||
@Resource
|
||||
private MqttClient mqttClient;
|
||||
|
||||
public void publish(String payload) throws Exception {
|
||||
MqttMessage message = new MqttMessage(payload.getBytes());
|
||||
message.setQos(1);
|
||||
message.setRetained(true);
|
||||
mqttClient.publish(cmdtopic, message);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,35 @@
|
||||
package com.shxy.xymanager_framework.mqtt;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MqttSubscriber implements IMqttMessageListener {
|
||||
|
||||
@Autowired
|
||||
private MqttClient mqttClient;
|
||||
|
||||
@Value("${mqtt.datatopic}")
|
||||
private String datatopic;
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Exception {
|
||||
// 订阅默认主题
|
||||
mqttClient.subscribe(datatopic, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||
String payload = new String(message.getPayload());
|
||||
log.info("Received message [%s]: %s%n", topic, payload);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue