diff --git a/fys-admin/src/main/resources/application-dev.yml b/fys-admin/src/main/resources/application-dev.yml index feec2c1..811dcfd 100644 --- a/fys-admin/src/main/resources/application-dev.yml +++ b/fys-admin/src/main/resources/application-dev.yml @@ -293,3 +293,12 @@ file: pic: C:\eladmin\file\ #设备图片存储路径 #ip: http://fuyuanshen.com:81/ #服务器地址 ip: https://fuyuanshen.com/ #服务器地址 +# MQTT配置 +mqtt: + username: admin + password: fys123456 + url: tcp://47.107.152.87:1883 + subClientId: fys_subClient_01 + subTopic: worker/alert/#,worker/location/# + pubTopic: worker/location + pubClientId: fys_pubClient_01 \ No newline at end of file diff --git a/fys-modules/fys-system/pom.xml b/fys-modules/fys-system/pom.xml index 1a26ea2..5f839ae 100644 --- a/fys-modules/fys-system/pom.xml +++ b/fys-modules/fys-system/pom.xml @@ -100,6 +100,14 @@ fys-common-sse + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttConfiguration.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..30ffadf --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttConfiguration.java @@ -0,0 +1,33 @@ +package com.fuyuanshen.system.mqtt.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:40 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + /** 创建连接工厂 **/ + @Bean + public MqttPahoClientFactory mqttPahoClientFactory(){ + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); //设置新会话 + options.setUserName(mqttPropertiesConfig.getUsername()); + options.setPassword(mqttPropertiesConfig.getPassword().toCharArray()); + options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttGateway.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttGateway.java new file mode 100644 index 0000000..0f24318 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttGateway.java @@ -0,0 +1,17 @@ +package com.fuyuanshen.system.mqtt.config; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 17:06 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload); + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload ); +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttInboundConfiguration.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttInboundConfiguration.java new file mode 100644 index 0000000..80191a1 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttInboundConfiguration.java @@ -0,0 +1,60 @@ +package com.fuyuanshen.system.mqtt.config; + + +import com.fuyuanshen.system.mqtt.receiver.ReceiverMessageHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:54 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttInboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + @Autowired + private ReceiverMessageHandler receiverMessageHandler; + //消息通道 + @Bean + public MessageChannel messageInboundChannel(){ + return new DirectChannel(); + } + + /** + * 配置入站适配器 + * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 + * */ + @Bean + public MessageProducer messageProducer(){ + MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getSubClientId(), + mqttPahoClientFactory, + mqttPropertiesConfig.getSubTopic().split(",") + ); + mqttPahoMessageDrivenChannelAdapter.setQos(1); + mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); + mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel()); + return mqttPahoMessageDrivenChannelAdapter; + } + /** 指定处理消息来自哪个通道 */ + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel") + public MessageHandler messageHandler(){ + return receiverMessageHandler; + } +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttOutboundConfiguration.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttOutboundConfiguration.java new file mode 100644 index 0000000..9029b47 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttOutboundConfiguration.java @@ -0,0 +1,50 @@ +package com.fuyuanshen.system.mqtt.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:46 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +@Slf4j +public class MqttOutboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + + // 消息通道 + @Bean + public MessageChannel mqttOutboundChannel(){ + return new DirectChannel(); + } + + + /** 配置出站消息处理器 */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定处理器针对哪个通道的消息进行处理 + public MessageHandler mqttOutboundMessageHandler(){ + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getPubClientId(), + mqttPahoClientFactory + ); + mqttPahoMessageHandler.setDefaultQos(1); + mqttPahoMessageHandler.setDefaultTopic("worker/location"); + mqttPahoMessageHandler.setAsync(true); + return mqttPahoMessageHandler; + } + +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttPropertiesConfig.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttPropertiesConfig.java new file mode 100644 index 0000000..9ae3374 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttPropertiesConfig.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.system.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:32 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Data +@ConfigurationProperties(prefix = "mqtt") +@Component +public class MqttPropertiesConfig { + private String username; + private String password; + private String url; + private String subClientId; + private String subTopic; + private String pubClientId; + private String pubTopic; +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/DeviceDataController.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/DeviceDataController.java new file mode 100644 index 0000000..e879402 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/DeviceDataController.java @@ -0,0 +1,23 @@ +package com.fuyuanshen.system.mqtt.publish; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/") +@Slf4j +public class DeviceDataController { + + @Autowired + private MqttClientTest mqttClientTest; + +// @PostMapping("/{deviceId}/command") + public ResponseEntity sendCommand() { + + mqttClientTest.sendMsg(); + return ResponseEntity.ok("success"); + } +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttClientTest.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttClientTest.java new file mode 100644 index 0000000..2332809 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttClientTest.java @@ -0,0 +1,22 @@ +package com.fuyuanshen.system.mqtt.publish; + + +import com.fuyuanshen.system.mqtt.config.MqttGateway; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class MqttClientTest { + @Autowired + private MqttGateway mqttGateway; + + public void sendMsg() { + mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot"); + log.info("message is send"); + + mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2"); + log.info("message is send2"); + } +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttMessageSender.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttMessageSender.java new file mode 100644 index 0000000..05bf77e --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttMessageSender.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.system.mqtt.publish; + +import com.fuyuanshen.system.mqtt.config.MqttGateway; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 16:16 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +public class MqttMessageSender { + @Autowired + private MqttGateway mqttGateway; + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) { + mqttGateway.sendMsgToMqtt(topic,payload); + } + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) { + mqttGateway.sendMsgToMqtt(topic,qos,payload); + } +} \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java new file mode 100644 index 0000000..9b8a9f6 --- /dev/null +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java @@ -0,0 +1,31 @@ +package com.fuyuanshen.system.mqtt.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:24 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +@Slf4j +public class ReceiverMessageHandler implements MessageHandler { + @Override + public void handleMessage(Message message) throws MessagingException{ + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}" + ,payload,receivedTopic,receivedQos,timestamp); + } +} \ No newline at end of file