diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java new file mode 100644 index 0000000..2961b68 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttConfiguration.java @@ -0,0 +1,33 @@ +package com.fuyuanshen.modules.mqtt.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:40 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + /** 创建连接工厂 **/ + @Bean + public MqttPahoClientFactory mqttPahoClientFactory(){ + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); //设置新会话 + options.setUserName(mqttPropertiesConfig.getUsername()); + options.setPassword(mqttPropertiesConfig.getPassword().toCharArray()); + options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()}); + factory.setConnectionOptions(options); + return factory; + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java new file mode 100644 index 0000000..9d50eb5 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttGateway.java @@ -0,0 +1,17 @@ +package com.fuyuanshen.modules.mqtt.config; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 17:06 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGateway { + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload); + public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload ); +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java new file mode 100644 index 0000000..191f31c --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttInboundConfiguration.java @@ -0,0 +1,60 @@ +package com.fuyuanshen.modules.mqtt.config; + + +import com.fuyuanshen.modules.mqtt.receiver.ReceiverMessageHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:54 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +public class MqttInboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + @Autowired + private ReceiverMessageHandler receiverMessageHandler; + //消息通道 + @Bean + public MessageChannel messageInboundChannel(){ + return new DirectChannel(); + } + + /** + * 配置入站适配器 + * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 + * */ + @Bean + public MessageProducer messageProducer(){ + MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getSubClientId(), + mqttPahoClientFactory, + mqttPropertiesConfig.getSubTopic().split(",") + ); + mqttPahoMessageDrivenChannelAdapter.setQos(1); + mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); + mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel()); + return mqttPahoMessageDrivenChannelAdapter; + } + /** 指定处理消息来自哪个通道 */ + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel") + public MessageHandler messageHandler(){ + return receiverMessageHandler; + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java new file mode 100644 index 0000000..23f4ad2 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttOutboundConfiguration.java @@ -0,0 +1,50 @@ +package com.fuyuanshen.modules.mqtt.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:46 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Configuration +@Slf4j +public class MqttOutboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + + // 消息通道 + @Bean + public MessageChannel mqttOutboundChannel(){ + return new DirectChannel(); + } + + + /** 配置出站消息处理器 */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定处理器针对哪个通道的消息进行处理 + public MessageHandler mqttOutboundMessageHandler(){ + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( + mqttPropertiesConfig.getUrl(), + mqttPropertiesConfig.getPubClientId(), + mqttPahoClientFactory + ); + mqttPahoMessageHandler.setDefaultQos(1); + mqttPahoMessageHandler.setDefaultTopic("worker/location"); + mqttPahoMessageHandler.setAsync(true); + return mqttPahoMessageHandler; + } + +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java new file mode 100644 index 0000000..0cf5077 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/config/MqttPropertiesConfig.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.modules.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 14:32 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Data +@ConfigurationProperties(prefix = "mqtt") +@Component +public class MqttPropertiesConfig { + private String username; + private String password; + private String url; + private String subClientId; + private String subTopic; + private String pubClientId; + private String pubTopic; +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java new file mode 100644 index 0000000..2b0c959 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/DeviceDataController.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.mqtt.publish; + +import com.fuyuanshen.annotation.rest.AnonymousGetMapping; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/device") +@Slf4j +public class DeviceDataController { + + @Autowired + private MqttClientTest mqttClientTest; + +// @PostMapping("/{deviceId}/command") + @AnonymousGetMapping(value = "/test/command") + public ResponseEntity sendCommand() { + + mqttClientTest.sendMsg(); + return ResponseEntity.ok("success"); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java new file mode 100644 index 0000000..03ee1a1 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttClientTest.java @@ -0,0 +1,22 @@ +package com.fuyuanshen.modules.mqtt.publish; + + +import com.fuyuanshen.modules.mqtt.config.MqttGateway; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class MqttClientTest { + @Autowired + private MqttGateway mqttGateway; + + public void sendMsg() { + mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot"); + log.info("message is send"); + + mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2"); + log.info("message is send2"); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java new file mode 100644 index 0000000..7760671 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/publish/MqttMessageSender.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.mqtt.publish; + +import com.fuyuanshen.modules.mqtt.config.MqttGateway; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 16:16 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +public class MqttMessageSender { + @Autowired + private MqttGateway mqttGateway; + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) { + mqttGateway.sendMsgToMqtt(topic,payload); + } + public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) { + mqttGateway.sendMsgToMqtt(topic,qos,payload); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java new file mode 100644 index 0000000..65f2a7a --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/mqtt/receiver/ReceiverMessageHandler.java @@ -0,0 +1,31 @@ +package com.fuyuanshen.modules.mqtt.receiver; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +/** + * @Author: HarryLin + * @Date: 2025/3/20 15:24 + * @Company: 北京红山信息科技研究院有限公司 + * @Email: linyun@***.com.cn + **/ +@Service +@Slf4j +public class ReceiverMessageHandler implements MessageHandler { + @Override + public void handleMessage(Message message) throws MessagingException{ + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}" + ,payload,receivedTopic,receivedQos,timestamp); + } +} \ No newline at end of file diff --git a/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java new file mode 100644 index 0000000..25efa35 --- /dev/null +++ b/fys-system/src/main/java/com/fuyuanshen/modules/security/config/DeviceDataService.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.modules.security.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class DeviceDataService { + + + @ServiceActivator(inputChannel = "mqttInputChannel") + public void handleDeviceData(Message message) { + String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); + String payload = message.getPayload().toString(); + + // 解析设备数据 + if (topic.startsWith("device/data/")) { + log.info("Received device data device/data/: {} {}", topic, payload); + } + } + +} \ No newline at end of file diff --git a/fys-system/src/main/resources/config/application-dev.yml b/fys-system/src/main/resources/config/application-dev.yml index 177945c..e6384a8 100644 --- a/fys-system/src/main/resources/config/application-dev.yml +++ b/fys-system/src/main/resources/config/application-dev.yml @@ -4,9 +4,9 @@ spring: druid: db-type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.p6spy.engine.spy.P6SpyDriver - url: jdbc:p6spy:mysql://192.168.2.23:3306/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false + url: jdbc:p6spy:mysql://120.79.224.186:3366/eladmin?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false username: root - password: root + password: 1fys@QWER.. # 初始连接数,建议设置为与最小空闲连接数相同 initial-size: 20 # 最小空闲连接数,保持足够的空闲连接以应对请求 @@ -53,10 +53,10 @@ spring: redis: #数据库索引 - database: ${REDIS_DB:2} - host: ${REDIS_HOST:123.207.99.140} - port: ${REDIS_PORT:6379} - password: ${REDIS_PWD:ccxx11234} + database: ${REDIS_DB:0} + host: ${REDIS_HOST:120.79.224.186} + port: ${REDIS_PORT:26379} + password: ${REDIS_PWD:1fys@QWER..} #连接超时时间 timeout: 5000 # 连接池配置 @@ -149,3 +149,12 @@ file: logging: level: com.fuyuanshen: debug +# MQTT配置 +mqtt: + username: admin + password: fys123456 + url: tcp://127.0.0.1:1883 + subClientId: wuLang_subClient_01 + subTopic: worker/alert/#,worker/location/# + pubTopic: worker/location + pubClientId: wuLang_pubClient_01 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 25ce3b0..941b33a 100644 --- a/pom.xml +++ b/pom.xml @@ -223,6 +223,14 @@ commons-text 1.13.0 + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt +