From c0c33f6c2e778c3efa0f77088303d0f9491e18a5 Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Thu, 26 Jun 2025 17:14:34 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E9=9B=86=E6=88=90MQTT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../security/config/DeviceDataController.java | 46 ++++++++++++++++++ .../security/config/DeviceDataService.java | 25 ++++++++++ .../modules/security/config/MqttConfig.java | 48 +++++++++++++++++++ .../security/config/MqttPublisher.java | 33 +++++++++++++ .../security/config/MqttSubscriberConfig.java | 47 ++++++++++++++++++ .../main/resources/config/application-dev.yml | 19 +++++--- pom.xml | 8 ++++ 7 files changed, 220 insertions(+), 6 deletions(-) create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java new file mode 100644 index 0000000..974f722 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.modules.security.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fuyuanshen.annotation.rest.AnonymousGetMapping; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +@RestController +@RequestMapping("/api/device") +@Slf4j +public class DeviceDataController { + + @Autowired + private MqttPublisher mqttPublisher; + +// @PostMapping("/{deviceId}/command") + @AnonymousGetMapping(value = "/test/command") + public ResponseEntity sendCommand() { + + try { + // 将命令转换为JSON格式 + // 生成模拟设备数据 + Map payload = new HashMap<>(); + payload.put("deviceId", 1); + payload.put("temperature", 20 + new Random().nextInt(10)); + payload.put("humidity", 40 + new Random().nextInt(20)); + payload.put("status", "online"); + payload.put("timestamp", System.currentTimeMillis()); + + // 发布设备数据到MQTT + String jsonPayload = new ObjectMapper().writeValueAsString(payload); + mqttPublisher.publish("device/data/" + 1, jsonPayload); + return ResponseEntity.ok("命令已发送"); + } catch (Exception e) { + log.error("发送设备命令失败: {}", e.getMessage(), e); + return ResponseEntity.status(500).body("发送命令失败"); + } + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java new file mode 100644 index 0000000..25efa35 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.security.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class DeviceDataService { + + + @ServiceActivator(inputChannel = "mqttInputChannel") + public void handleDeviceData(Message message) { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + String payload = message.getPayload().toString(); + + // 解析设备数据 + if (topic.startsWith("device/data/")) { + log.info("Received device data device/data/: {} {}", topic, payload); + } + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java new file mode 100644 index 0000000..f3fef66 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java @@ -0,0 +1,48 @@ +package com.fuyuanshen.modules.security.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +@Configuration +public class MqttConfig { + + @Value("${mqtt.host}") + private String host; + + @Value("${mqtt.clientId}") + private String clientId; + + @Value("${mqtt.username}") + private String username; + + @Value("${mqtt.password}") + private String password; + + @Value("${mqtt.defaultTopic}") + private String defaultTopic; + + // 配置MQTT客户端工厂 + @Bean + public MqttConnectOptions getMqttConnectOptions() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(username); + mqttConnectOptions.setPassword(password.toCharArray()); + mqttConnectOptions.setServerURIs(new String[]{host}); + mqttConnectOptions.setKeepAliveInterval(60); + return mqttConnectOptions; + } + + // 配置MQTT客户端 + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(getMqttConnectOptions()); + return factory; + } + + // 其他配置... +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java new file mode 100644 index 0000000..380a93b --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java @@ -0,0 +1,33 @@ +package com.fuyuanshen.modules.security.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class MqttPublisher { + + private final MqttPahoClientFactory mqttClientFactory; + private final String clientId; + + public MqttPublisher(MqttPahoClientFactory mqttClientFactory, + @Value("${mqtt.clientId}") String clientId) { + this.mqttClientFactory = mqttClientFactory; + this.clientId = clientId + "-publisher"; + } + + public void publish(String topic, String payload) { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + clientId, mqttClientFactory); + messageHandler.setAsync(true); + messageHandler.setDefaultTopic(topic); + messageHandler.start(); + messageHandler.setConverter(new DefaultPahoMessageConverter()); + messageHandler.handleMessage(new GenericMessage<>(payload)); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java new file mode 100644 index 0000000..6c53514 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java @@ -0,0 +1,47 @@ +package com.fuyuanshen.modules.security.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; + +@Configuration +public class MqttSubscriberConfig { + + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); + } + + @Bean + public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) { + + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter( + "subscriberClient", + mqttClientFactory, + "test/topic/#","device/data/#"); // 订阅主题通配符 + adapter.setCompletionTimeout(5000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setQos(1); + adapter.setOutputChannel(mqttInputChannel()); + return adapter; + } + + + +/* @Bean + @ServiceActivator(inputChannel = "mqttInputChannel") + public MessageHandler handler() { + return message -> { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + String payload = message.getPayload().toString(); + System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload); + // 处理接收到的消息 + }; + }*/ +} \ No newline at end of file diff --git a/fys-system/src/main/resources/config/application-dev.yml b/fys-system/src/main/resources/config/application-dev.yml index 177945c..856371e 100644 --- a/fys-system/src/main/resources/config/application-dev.yml +++ b/fys-system/src/main/resources/config/application-dev.yml @@ -4,9 +4,9 @@ spring: druid: db-type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.p6spy.engine.spy.P6SpyDriver - url: jdbc:p6spy:mysql://192.168.2.23:3306/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false + url: jdbc:p6spy:mysql://120.79.224.186:3366/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false username: root - password: root + password: 1fys@QWER.. # 初始连接数,建议设置为与最小空闲连接数相同 initial-size: 20 # 最小空闲连接数,保持足够的空闲连接以应对请求 @@ -53,10 +53,10 @@ spring: redis: #数据库索引 - database: ${REDIS_DB:2} - host: ${REDIS_HOST:123.207.99.140} - port: ${REDIS_PORT:6379} - password: ${REDIS_PWD:ccxx11234} + database: ${REDIS_DB:0} + host: ${REDIS_HOST:120.79.224.186} + port: ${REDIS_PORT:26379} + password: ${REDIS_PWD:1fys@QWER..} #连接超时时间 timeout: 5000 # 连接池配置 @@ -149,3 +149,10 @@ file: logging: level: com.fuyuanshen: debug +# MQTT配置 +mqtt: + host: tcp://47.107.152.87:1883 + clientId: fsy-admin-mqtt-client + username: admin + password: fys123456 + defaultTopic: test/topic \ No newline at end of file diff --git a/pom.xml b/pom.xml index 25ce3b0..941b33a 100644 --- a/pom.xml +++ b/pom.xml @@ -223,6 +223,14 @@ commons-text 1.13.0 + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + From ed180d6f187ad10f983d042d364f32cb2bbbcfa8 Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Fri, 27 Jun 2025 10:30:27 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E9=9B=86=E6=88=90MQTT,=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/config/MqttConfiguration.java | 33 ++++++++++ .../modules/mqtt/config/MqttGateway.java | 17 ++++++ .../mqtt/config/MqttInboundConfiguration.java | 60 +++++++++++++++++++ .../config/MqttOutboundConfiguration.java | 50 ++++++++++++++++ .../mqtt/config/MqttPropertiesConfig.java | 24 ++++++++ .../mqtt/publish/DeviceDataController.java | 25 ++++++++ .../modules/mqtt/publish/MqttClientTest.java | 22 +++++++ .../mqtt/publish/MqttMessageSender.java | 25 ++++++++ .../mqtt/receiver/ReceiverMessageHandler.java | 31 ++++++++++ .../security/config/DeviceDataController.java | 46 -------------- .../modules/security/config/MqttConfig.java | 48 --------------- .../security/config/MqttPublisher.java | 33 ---------- .../security/config/MqttSubscriberConfig.java | 47 --------------- .../main/resources/config/application-dev.yml | 8 ++- 14 files changed, 292 insertions(+), 177 deletions(-) create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..2961b68 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java @@ -0,0 +1,33 @@ +package com.fuyuanshen.modules.mqtt.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:40 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + /** 创建连接工厂 **/ + @Bean + public MqttPahoClientFactory mqttPahoClientFactory(){ + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); //设置新会话 + options.setUserName(mqttPropertiesConfig.getUsername()); + options.setPassword(mqttPropertiesConfig.getPassword().toCharArray()); + options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java new file mode 100644 index 0000000..9d50eb5 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java @@ -0,0 +1,17 @@ +package com.fuyuanshen.modules.mqtt.config; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 17:06 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload); + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload ); +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java new file mode 100644 index 0000000..191f31c --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java @@ -0,0 +1,60 @@ +package com.fuyuanshen.modules.mqtt.config; + + +import com.fuyuanshen.modules.mqtt.receiver.ReceiverMessageHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:54 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttInboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + @Autowired + private ReceiverMessageHandler receiverMessageHandler; + //消息通道 + @Bean + public MessageChannel messageInboundChannel(){ + return new DirectChannel(); + } + + /** + * 配置入站适配器 + * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 + * */ + @Bean + public MessageProducer messageProducer(){ + MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getSubClientId(), + mqttPahoClientFactory, + mqttPropertiesConfig.getSubTopic().split(",") + ); + mqttPahoMessageDrivenChannelAdapter.setQos(1); + mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); + mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel()); + return mqttPahoMessageDrivenChannelAdapter; + } + /** 指定处理消息来自哪个通道 */ + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel") + public MessageHandler messageHandler(){ + return receiverMessageHandler; + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java new file mode 100644 index 0000000..23f4ad2 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java @@ -0,0 +1,50 @@ +package com.fuyuanshen.modules.mqtt.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:46 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +@Slf4j +public class MqttOutboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + + // 消息通道 + @Bean + public MessageChannel mqttOutboundChannel(){ + return new DirectChannel(); + } + + + /** 配置出站消息处理器 */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定处理器针对哪个通道的消息进行处理 + public MessageHandler mqttOutboundMessageHandler(){ + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getPubClientId(), + mqttPahoClientFactory + ); + mqttPahoMessageHandler.setDefaultQos(1); + mqttPahoMessageHandler.setDefaultTopic("worker/location"); + mqttPahoMessageHandler.setAsync(true); + return mqttPahoMessageHandler; + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java new file mode 100644 index 0000000..0cf5077 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.modules.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:32 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Data +@ConfigurationProperties(prefix = "mqtt") +@Component +public class MqttPropertiesConfig { + private String username; + private String password; + private String url; + private String subClientId; + private String subTopic; + private String pubClientId; + private String pubTopic; +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java new file mode 100644 index 0000000..2b0c959 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.mqtt.publish; + +import com.fuyuanshen.annotation.rest.AnonymousGetMapping; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/device") +@Slf4j +public class DeviceDataController { + + @Autowired + private MqttClientTest mqttClientTest; + +// @PostMapping("/{deviceId}/command") + @AnonymousGetMapping(value = "/test/command") + public ResponseEntity sendCommand() { + + mqttClientTest.sendMsg(); + return ResponseEntity.ok("success"); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java new file mode 100644 index 0000000..03ee1a1 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java @@ -0,0 +1,22 @@ +package com.fuyuanshen.modules.mqtt.publish; + + +import com.fuyuanshen.modules.mqtt.config.MqttGateway; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class MqttClientTest { + @Autowired + private MqttGateway mqttGateway; + + public void sendMsg() { + mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot"); + log.info("message is send"); + + mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2"); + log.info("message is send2"); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java new file mode 100644 index 0000000..7760671 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.mqtt.publish; + +import com.fuyuanshen.modules.mqtt.config.MqttGateway; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 16:16 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +public class MqttMessageSender { + @Autowired + private MqttGateway mqttGateway; + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) { + mqttGateway.sendMsgToMqtt(topic,payload); + } + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) { + mqttGateway.sendMsgToMqtt(topic,qos,payload); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java new file mode 100644 index 0000000..65f2a7a --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java @@ -0,0 +1,31 @@ +package com.fuyuanshen.modules.mqtt.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:24 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +@Slf4j +public class ReceiverMessageHandler implements MessageHandler { + @Override + public void handleMessage(Message message) throws MessagingException{ + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}" + ,payload,receivedTopic,receivedQos,timestamp); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java deleted file mode 100644 index 974f722..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fuyuanshen.annotation.rest.AnonymousGetMapping; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -@RestController -@RequestMapping("/api/device") -@Slf4j -public class DeviceDataController { - - @Autowired - private MqttPublisher mqttPublisher; - -// @PostMapping("/{deviceId}/command") - @AnonymousGetMapping(value = "/test/command") - public ResponseEntity sendCommand() { - - try { - // 将命令转换为JSON格式 - // 生成模拟设备数据 - Map payload = new HashMap<>(); - payload.put("deviceId", 1); - payload.put("temperature", 20 + new Random().nextInt(10)); - payload.put("humidity", 40 + new Random().nextInt(20)); - payload.put("status", "online"); - payload.put("timestamp", System.currentTimeMillis()); - - // 发布设备数据到MQTT - String jsonPayload = new ObjectMapper().writeValueAsString(payload); - mqttPublisher.publish("device/data/" + 1, jsonPayload); - return ResponseEntity.ok("命令已发送"); - } catch (Exception e) { - log.error("发送设备命令失败: {}", e.getMessage(), e); - return ResponseEntity.status(500).body("发送命令失败"); - } - } -} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java deleted file mode 100644 index f3fef66..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; - -@Configuration -public class MqttConfig { - - @Value("${mqtt.host}") - private String host; - - @Value("${mqtt.clientId}") - private String clientId; - - @Value("${mqtt.username}") - private String username; - - @Value("${mqtt.password}") - private String password; - - @Value("${mqtt.defaultTopic}") - private String defaultTopic; - - // 配置MQTT客户端工厂 - @Bean - public MqttConnectOptions getMqttConnectOptions() { - MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); - mqttConnectOptions.setUserName(username); - mqttConnectOptions.setPassword(password.toCharArray()); - mqttConnectOptions.setServerURIs(new String[]{host}); - mqttConnectOptions.setKeepAliveInterval(60); - return mqttConnectOptions; - } - - // 配置MQTT客户端 - @Bean - public MqttPahoClientFactory mqttClientFactory() { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - factory.setConnectionOptions(getMqttConnectOptions()); - return factory; - } - - // 其他配置... -} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java deleted file mode 100644 index 380a93b..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.stereotype.Service; - -@Service -@Slf4j -public class MqttPublisher { - - private final MqttPahoClientFactory mqttClientFactory; - private final String clientId; - - public MqttPublisher(MqttPahoClientFactory mqttClientFactory, - @Value("${mqtt.clientId}") String clientId) { - this.mqttClientFactory = mqttClientFactory; - this.clientId = clientId + "-publisher"; - } - - public void publish(String topic, String payload) { - MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( - clientId, mqttClientFactory); - messageHandler.setAsync(true); - messageHandler.setDefaultTopic(topic); - messageHandler.start(); - messageHandler.setConverter(new DefaultPahoMessageConverter()); - messageHandler.handleMessage(new GenericMessage<>(payload)); - } -} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java deleted file mode 100644 index 6c53514..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.core.MessageProducer; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; - -@Configuration -public class MqttSubscriberConfig { - - @Bean - public MessageChannel mqttInputChannel() { - return new DirectChannel(); - } - - @Bean - public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) { - - MqttPahoMessageDrivenChannelAdapter adapter = - new MqttPahoMessageDrivenChannelAdapter( - "subscriberClient", - mqttClientFactory, - "test/topic/#","device/data/#"); // 订阅主题通配符 - adapter.setCompletionTimeout(5000); - adapter.setConverter(new DefaultPahoMessageConverter()); - adapter.setQos(1); - adapter.setOutputChannel(mqttInputChannel()); - return adapter; - } - - - -/* @Bean - @ServiceActivator(inputChannel = "mqttInputChannel") - public MessageHandler handler() { - return message -> { - String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); - String payload = message.getPayload().toString(); - System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload); - // 处理接收到的消息 - }; - }*/ -} \ No newline at end of file diff --git a/fys-system/src/main/resources/config/application-dev.yml b/fys-system/src/main/resources/config/application-dev.yml index 856371e..e6384a8 100644 --- a/fys-system/src/main/resources/config/application-dev.yml +++ b/fys-system/src/main/resources/config/application-dev.yml @@ -151,8 +151,10 @@ logging: com.fuyuanshen: debug # MQTT配置 mqtt: - host: tcp://47.107.152.87:1883 - clientId: fsy-admin-mqtt-client username: admin password: fys123456 - defaultTopic: test/topic \ No newline at end of file + url: tcp://127.0.0.1:1883 + subClientId: wuLang_subClient_01 + subTopic: worker/alert/#,worker/location/# + pubTopic: worker/location + pubClientId: wuLang_pubClient_01 \ No newline at end of file