集成MQTT
This commit is contained in:
@ -0,0 +1,46 @@
|
|||||||
|
package com.fuyuanshen.modules.security.config;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fuyuanshen.annotation.rest.AnonymousGetMapping;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/device")
|
||||||
|
@Slf4j
|
||||||
|
public class DeviceDataController {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MqttPublisher mqttPublisher;
|
||||||
|
|
||||||
|
// @PostMapping("/{deviceId}/command")
|
||||||
|
@AnonymousGetMapping(value = "/test/command")
|
||||||
|
public ResponseEntity<String> sendCommand() {
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 将命令转换为JSON格式
|
||||||
|
// 生成模拟设备数据
|
||||||
|
Map<String, Object> payload = new HashMap<>();
|
||||||
|
payload.put("deviceId", 1);
|
||||||
|
payload.put("temperature", 20 + new Random().nextInt(10));
|
||||||
|
payload.put("humidity", 40 + new Random().nextInt(20));
|
||||||
|
payload.put("status", "online");
|
||||||
|
payload.put("timestamp", System.currentTimeMillis());
|
||||||
|
|
||||||
|
// 发布设备数据到MQTT
|
||||||
|
String jsonPayload = new ObjectMapper().writeValueAsString(payload);
|
||||||
|
mqttPublisher.publish("device/data/" + 1, jsonPayload);
|
||||||
|
return ResponseEntity.ok("命令已发送");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("发送设备命令失败: {}", e.getMessage(), e);
|
||||||
|
return ResponseEntity.status(500).body("发送命令失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.fuyuanshen.modules.security.config;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.integration.annotation.ServiceActivator;
|
||||||
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class DeviceDataService {
|
||||||
|
|
||||||
|
|
||||||
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
||||||
|
public void handleDeviceData(Message<?> message) {
|
||||||
|
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
|
||||||
|
String payload = message.getPayload().toString();
|
||||||
|
|
||||||
|
// 解析设备数据
|
||||||
|
if (topic.startsWith("device/data/")) {
|
||||||
|
log.info("Received device data device/data/: {} {}", topic, payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,48 @@
|
|||||||
|
package com.fuyuanshen.modules.security.config;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class MqttConfig {
|
||||||
|
|
||||||
|
@Value("${mqtt.host}")
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
@Value("${mqtt.clientId}")
|
||||||
|
private String clientId;
|
||||||
|
|
||||||
|
@Value("${mqtt.username}")
|
||||||
|
private String username;
|
||||||
|
|
||||||
|
@Value("${mqtt.password}")
|
||||||
|
private String password;
|
||||||
|
|
||||||
|
@Value("${mqtt.defaultTopic}")
|
||||||
|
private String defaultTopic;
|
||||||
|
|
||||||
|
// 配置MQTT客户端工厂
|
||||||
|
@Bean
|
||||||
|
public MqttConnectOptions getMqttConnectOptions() {
|
||||||
|
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
|
||||||
|
mqttConnectOptions.setUserName(username);
|
||||||
|
mqttConnectOptions.setPassword(password.toCharArray());
|
||||||
|
mqttConnectOptions.setServerURIs(new String[]{host});
|
||||||
|
mqttConnectOptions.setKeepAliveInterval(60);
|
||||||
|
return mqttConnectOptions;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 配置MQTT客户端
|
||||||
|
@Bean
|
||||||
|
public MqttPahoClientFactory mqttClientFactory() {
|
||||||
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||||
|
factory.setConnectionOptions(getMqttConnectOptions());
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 其他配置...
|
||||||
|
}
|
@ -0,0 +1,33 @@
|
|||||||
|
package com.fuyuanshen.modules.security.config;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||||
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||||
|
import org.springframework.messaging.support.GenericMessage;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class MqttPublisher {
|
||||||
|
|
||||||
|
private final MqttPahoClientFactory mqttClientFactory;
|
||||||
|
private final String clientId;
|
||||||
|
|
||||||
|
public MqttPublisher(MqttPahoClientFactory mqttClientFactory,
|
||||||
|
@Value("${mqtt.clientId}") String clientId) {
|
||||||
|
this.mqttClientFactory = mqttClientFactory;
|
||||||
|
this.clientId = clientId + "-publisher";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void publish(String topic, String payload) {
|
||||||
|
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
|
||||||
|
clientId, mqttClientFactory);
|
||||||
|
messageHandler.setAsync(true);
|
||||||
|
messageHandler.setDefaultTopic(topic);
|
||||||
|
messageHandler.start();
|
||||||
|
messageHandler.setConverter(new DefaultPahoMessageConverter());
|
||||||
|
messageHandler.handleMessage(new GenericMessage<>(payload));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,47 @@
|
|||||||
|
package com.fuyuanshen.modules.security.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.core.MessageProducer;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||||
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||||
|
import org.springframework.messaging.MessageChannel;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class MqttSubscriberConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MessageChannel mqttInputChannel() {
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) {
|
||||||
|
|
||||||
|
MqttPahoMessageDrivenChannelAdapter adapter =
|
||||||
|
new MqttPahoMessageDrivenChannelAdapter(
|
||||||
|
"subscriberClient",
|
||||||
|
mqttClientFactory,
|
||||||
|
"test/topic/#","device/data/#"); // 订阅主题通配符
|
||||||
|
adapter.setCompletionTimeout(5000);
|
||||||
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
||||||
|
adapter.setQos(1);
|
||||||
|
adapter.setOutputChannel(mqttInputChannel());
|
||||||
|
return adapter;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* @Bean
|
||||||
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
||||||
|
public MessageHandler handler() {
|
||||||
|
return message -> {
|
||||||
|
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
|
||||||
|
String payload = message.getPayload().toString();
|
||||||
|
System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload);
|
||||||
|
// 处理接收到的消息
|
||||||
|
};
|
||||||
|
}*/
|
||||||
|
}
|
@ -4,9 +4,9 @@ spring:
|
|||||||
druid:
|
druid:
|
||||||
db-type: com.alibaba.druid.pool.DruidDataSource
|
db-type: com.alibaba.druid.pool.DruidDataSource
|
||||||
driverClassName: com.p6spy.engine.spy.P6SpyDriver
|
driverClassName: com.p6spy.engine.spy.P6SpyDriver
|
||||||
url: jdbc:p6spy:mysql://192.168.2.23:3306/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
|
url: jdbc:p6spy:mysql://120.79.224.186:3366/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
|
||||||
username: root
|
username: root
|
||||||
password: root
|
password: 1fys@QWER..
|
||||||
# 初始连接数,建议设置为与最小空闲连接数相同
|
# 初始连接数,建议设置为与最小空闲连接数相同
|
||||||
initial-size: 20
|
initial-size: 20
|
||||||
# 最小空闲连接数,保持足够的空闲连接以应对请求
|
# 最小空闲连接数,保持足够的空闲连接以应对请求
|
||||||
@ -53,10 +53,10 @@ spring:
|
|||||||
|
|
||||||
redis:
|
redis:
|
||||||
#数据库索引
|
#数据库索引
|
||||||
database: ${REDIS_DB:2}
|
database: ${REDIS_DB:0}
|
||||||
host: ${REDIS_HOST:123.207.99.140}
|
host: ${REDIS_HOST:120.79.224.186}
|
||||||
port: ${REDIS_PORT:6379}
|
port: ${REDIS_PORT:26379}
|
||||||
password: ${REDIS_PWD:ccxx11234}
|
password: ${REDIS_PWD:1fys@QWER..}
|
||||||
#连接超时时间
|
#连接超时时间
|
||||||
timeout: 5000
|
timeout: 5000
|
||||||
# 连接池配置
|
# 连接池配置
|
||||||
@ -149,3 +149,10 @@ file:
|
|||||||
logging:
|
logging:
|
||||||
level:
|
level:
|
||||||
com.fuyuanshen: debug
|
com.fuyuanshen: debug
|
||||||
|
# MQTT配置
|
||||||
|
mqtt:
|
||||||
|
host: tcp://47.107.152.87:1883
|
||||||
|
clientId: fsy-admin-mqtt-client
|
||||||
|
username: admin
|
||||||
|
password: fys123456
|
||||||
|
defaultTopic: test/topic
|
8
pom.xml
8
pom.xml
@ -223,6 +223,14 @@
|
|||||||
<artifactId>commons-text</artifactId>
|
<artifactId>commons-text</artifactId>
|
||||||
<version>1.13.0</version>
|
<version>1.13.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-integration</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.integration</groupId>
|
||||||
|
<artifactId>spring-integration-mqtt</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
Reference in New Issue
Block a user