From ed180d6f187ad10f983d042d364f32cb2bbbcfa8 Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Fri, 27 Jun 2025 10:30:27 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E6=88=90MQTT,=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/config/MqttConfiguration.java | 33 ++++++++++ .../modules/mqtt/config/MqttGateway.java | 17 ++++++ .../mqtt/config/MqttInboundConfiguration.java | 60 +++++++++++++++++++ .../config/MqttOutboundConfiguration.java | 50 ++++++++++++++++ .../mqtt/config/MqttPropertiesConfig.java | 24 ++++++++ .../mqtt/publish/DeviceDataController.java | 25 ++++++++ .../modules/mqtt/publish/MqttClientTest.java | 22 +++++++ .../mqtt/publish/MqttMessageSender.java | 25 ++++++++ .../mqtt/receiver/ReceiverMessageHandler.java | 31 ++++++++++ .../security/config/DeviceDataController.java | 46 -------------- .../modules/security/config/MqttConfig.java | 48 --------------- .../security/config/MqttPublisher.java | 33 ---------- .../security/config/MqttSubscriberConfig.java | 47 --------------- .../main/resources/config/application-dev.yml | 8 ++- 14 files changed, 292 insertions(+), 177 deletions(-) create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java create mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java delete mode 100644 fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..2961b68 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java @@ -0,0 +1,33 @@ +package com.fuyuanshen.modules.mqtt.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:40 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + /** 创建连接工厂 **/ + @Bean + public MqttPahoClientFactory mqttPahoClientFactory(){ + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); //设置新会话 + options.setUserName(mqttPropertiesConfig.getUsername()); + options.setPassword(mqttPropertiesConfig.getPassword().toCharArray()); + options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java new file mode 100644 index 0000000..9d50eb5 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java @@ -0,0 +1,17 @@ +package com.fuyuanshen.modules.mqtt.config; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 17:06 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload); + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload ); +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java new file mode 100644 index 0000000..191f31c --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java @@ -0,0 +1,60 @@ +package com.fuyuanshen.modules.mqtt.config; + + +import com.fuyuanshen.modules.mqtt.receiver.ReceiverMessageHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:54 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttInboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + @Autowired + private ReceiverMessageHandler receiverMessageHandler; + //消息通道 + @Bean + public MessageChannel messageInboundChannel(){ + return new DirectChannel(); + } + + /** + * 配置入站适配器 + * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 + * */ + @Bean + public MessageProducer messageProducer(){ + MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getSubClientId(), + mqttPahoClientFactory, + mqttPropertiesConfig.getSubTopic().split(",") + ); + mqttPahoMessageDrivenChannelAdapter.setQos(1); + mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); + mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel()); + return mqttPahoMessageDrivenChannelAdapter; + } + /** 指定处理消息来自哪个通道 */ + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel") + public MessageHandler messageHandler(){ + return receiverMessageHandler; + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java new file mode 100644 index 0000000..23f4ad2 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java @@ -0,0 +1,50 @@ +package com.fuyuanshen.modules.mqtt.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:46 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +@Slf4j +public class MqttOutboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + + // 消息通道 + @Bean + public MessageChannel mqttOutboundChannel(){ + return new DirectChannel(); + } + + + /** 配置出站消息处理器 */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定处理器针对哪个通道的消息进行处理 + public MessageHandler mqttOutboundMessageHandler(){ + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getPubClientId(), + mqttPahoClientFactory + ); + mqttPahoMessageHandler.setDefaultQos(1); + mqttPahoMessageHandler.setDefaultTopic("worker/location"); + mqttPahoMessageHandler.setAsync(true); + return mqttPahoMessageHandler; + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java new file mode 100644 index 0000000..0cf5077 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.modules.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:32 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Data +@ConfigurationProperties(prefix = "mqtt") +@Component +public class MqttPropertiesConfig { + private String username; + private String password; + private String url; + private String subClientId; + private String subTopic; + private String pubClientId; + private String pubTopic; +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java new file mode 100644 index 0000000..2b0c959 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.mqtt.publish; + +import com.fuyuanshen.annotation.rest.AnonymousGetMapping; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/device") +@Slf4j +public class DeviceDataController { + + @Autowired + private MqttClientTest mqttClientTest; + +// @PostMapping("/{deviceId}/command") + @AnonymousGetMapping(value = "/test/command") + public ResponseEntity sendCommand() { + + mqttClientTest.sendMsg(); + return ResponseEntity.ok("success"); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java new file mode 100644 index 0000000..03ee1a1 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java @@ -0,0 +1,22 @@ +package com.fuyuanshen.modules.mqtt.publish; + + +import com.fuyuanshen.modules.mqtt.config.MqttGateway; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class MqttClientTest { + @Autowired + private MqttGateway mqttGateway; + + public void sendMsg() { + mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot"); + log.info("message is send"); + + mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2"); + log.info("message is send2"); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java new file mode 100644 index 0000000..7760671 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.mqtt.publish; + +import com.fuyuanshen.modules.mqtt.config.MqttGateway; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 16:16 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +public class MqttMessageSender { + @Autowired + private MqttGateway mqttGateway; + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) { + mqttGateway.sendMsgToMqtt(topic,payload); + } + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) { + mqttGateway.sendMsgToMqtt(topic,qos,payload); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java new file mode 100644 index 0000000..65f2a7a --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java @@ -0,0 +1,31 @@ +package com.fuyuanshen.modules.mqtt.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:24 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +@Slf4j +public class ReceiverMessageHandler implements MessageHandler { + @Override + public void handleMessage(Message message) throws MessagingException{ + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}" + ,payload,receivedTopic,receivedQos,timestamp); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java deleted file mode 100644 index 974f722..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataController.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fuyuanshen.annotation.rest.AnonymousGetMapping; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -@RestController -@RequestMapping("/api/device") -@Slf4j -public class DeviceDataController { - - @Autowired - private MqttPublisher mqttPublisher; - -// @PostMapping("/{deviceId}/command") - @AnonymousGetMapping(value = "/test/command") - public ResponseEntity sendCommand() { - - try { - // 将命令转换为JSON格式 - // 生成模拟设备数据 - Map payload = new HashMap<>(); - payload.put("deviceId", 1); - payload.put("temperature", 20 + new Random().nextInt(10)); - payload.put("humidity", 40 + new Random().nextInt(20)); - payload.put("status", "online"); - payload.put("timestamp", System.currentTimeMillis()); - - // 发布设备数据到MQTT - String jsonPayload = new ObjectMapper().writeValueAsString(payload); - mqttPublisher.publish("device/data/" + 1, jsonPayload); - return ResponseEntity.ok("命令已发送"); - } catch (Exception e) { - log.error("发送设备命令失败: {}", e.getMessage(), e); - return ResponseEntity.status(500).body("发送命令失败"); - } - } -} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java deleted file mode 100644 index f3fef66..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttConfig.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; - -@Configuration -public class MqttConfig { - - @Value("${mqtt.host}") - private String host; - - @Value("${mqtt.clientId}") - private String clientId; - - @Value("${mqtt.username}") - private String username; - - @Value("${mqtt.password}") - private String password; - - @Value("${mqtt.defaultTopic}") - private String defaultTopic; - - // 配置MQTT客户端工厂 - @Bean - public MqttConnectOptions getMqttConnectOptions() { - MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); - mqttConnectOptions.setUserName(username); - mqttConnectOptions.setPassword(password.toCharArray()); - mqttConnectOptions.setServerURIs(new String[]{host}); - mqttConnectOptions.setKeepAliveInterval(60); - return mqttConnectOptions; - } - - // 配置MQTT客户端 - @Bean - public MqttPahoClientFactory mqttClientFactory() { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - factory.setConnectionOptions(getMqttConnectOptions()); - return factory; - } - - // 其他配置... -} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java deleted file mode 100644 index 380a93b..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttPublisher.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.stereotype.Service; - -@Service -@Slf4j -public class MqttPublisher { - - private final MqttPahoClientFactory mqttClientFactory; - private final String clientId; - - public MqttPublisher(MqttPahoClientFactory mqttClientFactory, - @Value("${mqtt.clientId}") String clientId) { - this.mqttClientFactory = mqttClientFactory; - this.clientId = clientId + "-publisher"; - } - - public void publish(String topic, String payload) { - MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( - clientId, mqttClientFactory); - messageHandler.setAsync(true); - messageHandler.setDefaultTopic(topic); - messageHandler.start(); - messageHandler.setConverter(new DefaultPahoMessageConverter()); - messageHandler.handleMessage(new GenericMessage<>(payload)); - } -} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java deleted file mode 100644 index 6c53514..0000000 --- a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/MqttSubscriberConfig.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.fuyuanshen.modules.security.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.core.MessageProducer; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; - -@Configuration -public class MqttSubscriberConfig { - - @Bean - public MessageChannel mqttInputChannel() { - return new DirectChannel(); - } - - @Bean - public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) { - - MqttPahoMessageDrivenChannelAdapter adapter = - new MqttPahoMessageDrivenChannelAdapter( - "subscriberClient", - mqttClientFactory, - "test/topic/#","device/data/#"); // 订阅主题通配符 - adapter.setCompletionTimeout(5000); - adapter.setConverter(new DefaultPahoMessageConverter()); - adapter.setQos(1); - adapter.setOutputChannel(mqttInputChannel()); - return adapter; - } - - - -/* @Bean - @ServiceActivator(inputChannel = "mqttInputChannel") - public MessageHandler handler() { - return message -> { - String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); - String payload = message.getPayload().toString(); - System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload); - // 处理接收到的消息 - }; - }*/ -} \ No newline at end of file diff --git a/fys-system/src/main/resources/config/application-dev.yml b/fys-system/src/main/resources/config/application-dev.yml index 856371e..e6384a8 100644 --- a/fys-system/src/main/resources/config/application-dev.yml +++ b/fys-system/src/main/resources/config/application-dev.yml @@ -151,8 +151,10 @@ logging: com.fuyuanshen: debug # MQTT配置 mqtt: - host: tcp://47.107.152.87:1883 - clientId: fsy-admin-mqtt-client username: admin password: fys123456 - defaultTopic: test/topic \ No newline at end of file + url: tcp://127.0.0.1:1883 + subClientId: wuLang_subClient_01 + subTopic: worker/alert/#,worker/location/# + pubTopic: worker/location + pubClientId: wuLang_pubClient_01 \ No newline at end of file