From 34841c87045feedb093b03259f06e42f66d039ce Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Mon, 2 Mar 2026 11:17:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=812?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RegisEquipMqttInboundConfiguration.java | 68 +++++++++++++++++++ .../RegisEquipMqttOutboundConfiguration.java | 46 +++++++++++++ .../RegisEquipReceiverMessageHandler.java | 51 ++++++++++++++ .../web/service/device/DeviceBizService.java | 2 +- .../src/main/resources/application-prod.yml | 8 +-- 5 files changed, 170 insertions(+), 5 deletions(-) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttOutboundConfiguration.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/RegisEquipReceiverMessageHandler.java diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java new file mode 100644 index 00000000..877db4f1 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java @@ -0,0 +1,68 @@ +package com.fuyuanshen.global.mqtt.config; + + +import cn.hutool.core.lang.UUID; +import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + + +@Configuration +@Slf4j +public class RegisEquipMqttInboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + @Autowired + private NewReceiverMessageHandler receiverMessageHandler3; + //消息通道 + @Bean + public MessageChannel messageInboundChannel3(){ + return new DirectChannel(); + } + + /** + * 配置入站适配器 + * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 + * */ + @Bean + public MessageProducer messageProducer3(){ + // 生成一个不重复的随机数 + String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID(); + String subTopic = mqttPropertiesConfig.getSubTopic2(); + log.info("订阅主题:{}", subTopic); + MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( + mqttPropertiesConfig.getUrl(), + clientId, + mqttPahoClientFactory, + subTopic.split(",") + ); + mqttPahoMessageDrivenChannelAdapter.setQos(1); + mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); + mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel3()); + return mqttPahoMessageDrivenChannelAdapter; + } + /** 指定处理消息来自哪个通道 */ + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel3") + public MessageHandler messageHandler3(){ + return receiverMessageHandler3; + } + + // @Bean + // @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确 + // public MessageHandler deviceAlarmMessageHandler() { + // return new DeviceAlrmMessageHandler(); + // } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttOutboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttOutboundConfiguration.java new file mode 100644 index 00000000..0243ab0b --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttOutboundConfiguration.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.global.mqtt.config; + +import cn.hutool.core.lang.UUID; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Configuration +@Slf4j +public class RegisEquipMqttOutboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + + // 消息通道 + @Bean + public MessageChannel mqttOutboundChannel3(){ + return new DirectChannel(); + } + + + /** 配置出站消息处理器 */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel3") // 指定处理器针对哪个通道的消息进行处理 + public MessageHandler mqttOutboundMessageHandler3(){ + String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID(); + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( + mqttPropertiesConfig.getUrl(), + clientId, + mqttPahoClientFactory + ); + mqttPahoMessageHandler.setDefaultQos(1); + mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2()); + mqttPahoMessageHandler.setAsync(true); + return mqttPahoMessageHandler; + } + +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/RegisEquipReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/RegisEquipReceiverMessageHandler.java new file mode 100644 index 00000000..691abbef --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/RegisEquipReceiverMessageHandler.java @@ -0,0 +1,51 @@ +package com.fuyuanshen.global.mqtt.receiver; + +import cn.hutool.core.lang.Dict; +import com.fuyuanshen.common.core.constant.GlobalConstants; +import com.fuyuanshen.common.core.utils.StringUtils; +import com.fuyuanshen.common.json.utils.JsonUtils; +import com.fuyuanshen.common.redis.utils.RedisUtils; +import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext; +import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine; +import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; +import com.fuyuanshen.global.queue.MqttMessageQueueConstants; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.Objects; + +import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; + +@Service +@Slf4j +public class RegisEquipReceiverMessageHandler implements MessageHandler { + + @Autowired + private NewMqttRuleEngine newRuleEngine; + + @Override + public void handleMessage(Message message) throws MessagingException { + + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + + log.info("MQTT3 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}", + payload, receivedTopic, receivedQos, timestamp); + + Dict payloadDict = JsonUtils.parseMap(payload.toString()); + if (receivedTopic == null || payloadDict == null) { + return; + } + + + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBizService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBizService.java index 8dfcdbbc..5b1a0a1c 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBizService.java @@ -86,7 +86,7 @@ public class DeviceBizService { List records = result.getRecords(); if (records != null && !records.isEmpty()) { records.forEach(item -> { - if (item.getCommunicationMode() != null && item.getCommunicationMode() == 0) { + if (item.getCommunicationMode() != null && (item.getCommunicationMode() == 0 || item.getCommunicationMode() == 2)) { // 设备在线状态 String onlineStatus = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + item.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX); diff --git a/fys-admin/src/main/resources/application-prod.yml b/fys-admin/src/main/resources/application-prod.yml index 3dd8b5b3..ab3bd29c 100644 --- a/fys-admin/src/main/resources/application-prod.yml +++ b/fys-admin/src/main/resources/application-prod.yml @@ -283,10 +283,10 @@ mqtt: password: #YtvpSfCNG url: tcp://47.120.79.150:3883 subClientId: fys_subClient - subTopic: A/# - pubTopic: B/# - subTopic2: status/894078/# - pubTopic2: command/894078/# + subTopic: A/#,status/# + pubTopic: B/#,command/# + subTopic2: regis/equip/# + pubTopic2: regis/# pubClientId: fys_pubClient # TTS语音交互配置