diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java new file mode 100644 index 0000000..974f722 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.modules.security.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fuyuanshen.annotation.rest.AnonymousGetMapping; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +@RestController +@RequestMapping("/api/device") +@Slf4j +public class DeviceDataController { + + @Autowired + private MqttPublisher mqttPublisher; + +// @PostMapping("/{deviceId}/command") + @AnonymousGetMapping(value = "/test/command") + public ResponseEntity sendCommand() { + + try { + // 将命令转换为JSON格式 + // 生成模拟设备数据 + Map payload = new HashMap<>(); + payload.put("deviceId", 1); + payload.put("temperature", 20 + new Random().nextInt(10)); + payload.put("humidity", 40 + new Random().nextInt(20)); + payload.put("status", "online"); + payload.put("timestamp", System.currentTimeMillis()); + + // 发布设备数据到MQTT + String jsonPayload = new ObjectMapper().writeValueAsString(payload); + mqttPublisher.publish("device/data/" + 1, jsonPayload); + return ResponseEntity.ok("命令已发送"); + } catch (Exception e) { + log.error("发送设备命令失败: {}", e.getMessage(), e); + return ResponseEntity.status(500).body("发送命令失败"); + } + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java new file mode 100644 index 0000000..25efa35 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.security.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class DeviceDataService { + + + @ServiceActivator(inputChannel = "mqttInputChannel") + public void handleDeviceData(Message message) { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + String payload = message.getPayload().toString(); + + // 解析设备数据 + if (topic.startsWith("device/data/")) { + log.info("Received device data device/data/: {} {}", topic, payload); + } + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java new file mode 100644 index 0000000..f3fef66 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java @@ -0,0 +1,48 @@ +package com.fuyuanshen.modules.security.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Configuration +public class MqttConfig { + + @Value("${mqtt.host}") + private String host; + + @Value("${mqtt.clientId}") + private String clientId; + + @Value("${mqtt.username}") + private String username; + + @Value("${mqtt.password}") + private String password; + + @Value("${mqtt.defaultTopic}") + private String defaultTopic; + + // 配置MQTT客户端工厂 + @Bean + public MqttConnectOptions getMqttConnectOptions() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(username); + mqttConnectOptions.setPassword(password.toCharArray()); + mqttConnectOptions.setServerURIs(new String[]{host}); + mqttConnectOptions.setKeepAliveInterval(60); + return mqttConnectOptions; + } + + // 配置MQTT客户端 + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(getMqttConnectOptions()); + return factory; + } + + // 其他配置... +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java new file mode 100644 index 0000000..380a93b --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java @@ -0,0 +1,33 @@ +package com.fuyuanshen.modules.security.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class MqttPublisher { + + private final MqttPahoClientFactory mqttClientFactory; + private final String clientId; + + public MqttPublisher(MqttPahoClientFactory mqttClientFactory, + @Value("${mqtt.clientId}") String clientId) { + this.mqttClientFactory = mqttClientFactory; + this.clientId = clientId + "-publisher"; + } + + public void publish(String topic, String payload) { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + clientId, mqttClientFactory); + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(topic); + messageHandler.start(); + messageHandler.setConverter(new DefaultPahoMessageConverter()); + messageHandler.handleMessage(new GenericMessage<>(payload)); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java new file mode 100644 index 0000000..6c53514 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java @@ -0,0 +1,47 @@ +package com.fuyuanshen.modules.security.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; + +@Configuration +public class MqttSubscriberConfig { + + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); + } + + @Bean + public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) { + + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter( + "subscriberClient", + mqttClientFactory, + "test/topic/#","device/data/#"); // 订阅主题通配符 + adapter.setCompletionTimeout(5000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setQos(1); + adapter.setOutputChannel(mqttInputChannel()); + return adapter; + } + + + +/* @Bean + @ServiceActivator(inputChannel = "mqttInputChannel") + public MessageHandler handler() { + return message -> { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + String payload = message.getPayload().toString(); + System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload); + // 处理接收到的消息 + }; + }*/ +} \ No newline at end of file diff --git a/fys-system/src/main/resources/config/application-dev.yml b/fys-system/src/main/resources/config/application-dev.yml index 177945c..856371e 100644 --- a/fys-system/src/main/resources/config/application-dev.yml +++ b/fys-system/src/main/resources/config/application-dev.yml @@ -4,9 +4,9 @@ spring: druid: db-type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.p6spy.engine.spy.P6SpyDriver - url: jdbc:p6spy:mysql://192.168.2.23:3306/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false + url: jdbc:p6spy:mysql://120.79.224.186:3366/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false username: root - password: root + password: 1fys@QWER.. # 初始连接数,建议设置为与最小空闲连接数相同 initial-size: 20 # 最小空闲连接数,保持足够的空闲连接以应对请求 @@ -53,10 +53,10 @@ spring: redis: #数据库索引 - database: ${REDIS_DB:2} - host: ${REDIS_HOST:123.207.99.140} - port: ${REDIS_PORT:6379} - password: ${REDIS_PWD:ccxx11234} + database: ${REDIS_DB:0} + host: ${REDIS_HOST:120.79.224.186} + port: ${REDIS_PORT:26379} + password: ${REDIS_PWD:1fys@QWER..} #连接超时时间 timeout: 5000 # 连接池配置 @@ -149,3 +149,10 @@ file: logging: level: com.fuyuanshen: debug +# MQTT配置 +mqtt: + host: tcp://47.107.152.87:1883 + clientId: fsy-admin-mqtt-client + username: admin + password: fys123456 + defaultTopic: test/topic \ No newline at end of file diff --git a/pom.xml b/pom.xml index 25ce3b0..941b33a 100644 --- a/pom.xml +++ b/pom.xml @@ -223,6 +223,14 @@ commons-text 1.13.0 + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt +