diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java index 667f3799..f11544e6 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java @@ -1,68 +1,68 @@ -package com.fuyuanshen.global.mqtt.config; - - -import cn.hutool.core.lang.UUID; -import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.core.MessageProducer; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - - -@Configuration -@Slf4j -public class NewMqttInboundConfiguration { - @Autowired - private MqttPropertiesConfig mqttPropertiesConfig; - @Autowired - private MqttPahoClientFactory mqttPahoClientFactory; - @Autowired - private NewReceiverMessageHandler receiverMessageHandler2; - //消息通道 - @Bean - public MessageChannel messageInboundChannel2(){ - return new DirectChannel(); - } - - /** - * 配置入站适配器 - * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 - * */ - @Bean - public MessageProducer messageProducer2(){ - // 生成一个不重复的随机数 - String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID(); - String subTopic = mqttPropertiesConfig.getSubTopic2(); - log.info("订阅主题:{}", subTopic); - MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( - mqttPropertiesConfig.getUrl(), - clientId, - mqttPahoClientFactory, - subTopic.split(",") - ); - mqttPahoMessageDrivenChannelAdapter.setQos(1); - mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); - mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2()); - return mqttPahoMessageDrivenChannelAdapter; - } - /** 指定处理消息来自哪个通道 */ - @Bean - @ServiceActivator(inputChannel = "messageInboundChannel2") - public MessageHandler messageHandler2(){ - return receiverMessageHandler2; - } - - // @Bean - // @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确 - // public MessageHandler deviceAlarmMessageHandler() { - // return new DeviceAlrmMessageHandler(); - // } -} \ No newline at end of file +//package com.fuyuanshen.global.mqtt.config; +// +// +//import cn.hutool.core.lang.UUID; +//import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.integration.annotation.ServiceActivator; +//import org.springframework.integration.channel.DirectChannel; +//import org.springframework.integration.core.MessageProducer; +//import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +//import org.springframework.messaging.MessageChannel; +//import org.springframework.messaging.MessageHandler; +// +// +//@Configuration +//@Slf4j +//public class NewMqttInboundConfiguration { +// @Autowired +// private MqttPropertiesConfig mqttPropertiesConfig; +// @Autowired +// private MqttPahoClientFactory mqttPahoClientFactory; +// @Autowired +// private NewReceiverMessageHandler receiverMessageHandler2; +// //消息通道 +// @Bean +// public MessageChannel messageInboundChannel2(){ +// return new DirectChannel(); +// } +// +// /** +// * 配置入站适配器 +// * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 +// * */ +// @Bean +// public MessageProducer messageProducer2(){ +// // 生成一个不重复的随机数 +// String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID(); +// String subTopic = mqttPropertiesConfig.getSubTopic2(); +// log.info("订阅主题:{}", subTopic); +// MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( +// mqttPropertiesConfig.getUrl(), +// clientId, +// mqttPahoClientFactory, +// subTopic.split(",") +// ); +// mqttPahoMessageDrivenChannelAdapter.setQos(1); +// mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); +// mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2()); +// return mqttPahoMessageDrivenChannelAdapter; +// } +// /** 指定处理消息来自哪个通道 */ +// @Bean +// @ServiceActivator(inputChannel = "messageInboundChannel2") +// public MessageHandler messageHandler2(){ +// return receiverMessageHandler2; +// } +// +// // @Bean +// // @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确 +// // public MessageHandler deviceAlarmMessageHandler() { +// // return new DeviceAlrmMessageHandler(); +// // } +//} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java index 3e11af7d..16a63d82 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java @@ -1,46 +1,46 @@ -package com.fuyuanshen.global.mqtt.config; - -import cn.hutool.core.lang.UUID; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - -@Configuration -@Slf4j -public class NewMqttOutboundConfiguration { - @Autowired - private MqttPropertiesConfig mqttPropertiesConfig; - @Autowired - private MqttPahoClientFactory mqttPahoClientFactory; - - // 消息通道 - @Bean - public MessageChannel mqttOutboundChannel2(){ - return new DirectChannel(); - } - - - /** 配置出站消息处理器 */ - @Bean - @ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理 - public MessageHandler mqttOutboundMessageHandler2(){ - String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID(); - MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( - mqttPropertiesConfig.getUrl(), - clientId, - mqttPahoClientFactory - ); - mqttPahoMessageHandler.setDefaultQos(1); - mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2()); - mqttPahoMessageHandler.setAsync(true); - return mqttPahoMessageHandler; - } - -} \ No newline at end of file +//package com.fuyuanshen.global.mqtt.config; +// +//import cn.hutool.core.lang.UUID; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.integration.annotation.ServiceActivator; +//import org.springframework.integration.channel.DirectChannel; +//import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +//import org.springframework.messaging.MessageChannel; +//import org.springframework.messaging.MessageHandler; +// +//@Configuration +//@Slf4j +//public class NewMqttOutboundConfiguration { +// @Autowired +// private MqttPropertiesConfig mqttPropertiesConfig; +// @Autowired +// private MqttPahoClientFactory mqttPahoClientFactory; +// +// // 消息通道 +// @Bean +// public MessageChannel mqttOutboundChannel2(){ +// return new DirectChannel(); +// } +// +// +// /** 配置出站消息处理器 */ +// @Bean +// @ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理 +// public MessageHandler mqttOutboundMessageHandler2(){ +// String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID(); +// MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( +// mqttPropertiesConfig.getUrl(), +// clientId, +// mqttPahoClientFactory +// ); +// mqttPahoMessageHandler.setDefaultQos(1); +// mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2()); +// mqttPahoMessageHandler.setAsync(true); +// return mqttPahoMessageHandler; +// } +// +//} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java index 877db4f1..5bc45d1d 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/RegisEquipMqttInboundConfiguration.java @@ -2,7 +2,8 @@ package com.fuyuanshen.global.mqtt.config; import cn.hutool.core.lang.UUID; -import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler; +//import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler; +import com.fuyuanshen.global.mqtt.receiver.RegisEquipReceiverMessageHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -25,7 +26,7 @@ public class RegisEquipMqttInboundConfiguration { @Autowired private MqttPahoClientFactory mqttPahoClientFactory; @Autowired - private NewReceiverMessageHandler receiverMessageHandler3; + private RegisEquipReceiverMessageHandler receiverMessageHandler3; //消息通道 @Bean public MessageChannel messageInboundChannel3(){ diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java index f86d00d2..696e27d0 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java @@ -1,125 +1,125 @@ -package com.fuyuanshen.global.mqtt.receiver; - -import cn.hutool.core.lang.Dict; -import com.baomidou.lock.LockTemplate; -import com.fuyuanshen.common.core.constant.GlobalConstants; -import com.fuyuanshen.common.core.utils.StringUtils; -import com.fuyuanshen.common.json.utils.JsonUtils; -import com.fuyuanshen.common.redis.utils.RedisUtils; -import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext; -import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine; -import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; -import com.fuyuanshen.global.queue.MqttMessageQueueConstants; -import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.stereotype.Service; - -import java.time.Duration; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; - -@Service -@Slf4j -public class NewReceiverMessageHandler implements MessageHandler { - - @Autowired - private NewMqttRuleEngine newRuleEngine; - - @Override - public void handleMessage(Message message) throws MessagingException { - - Object payload = message.getPayload(); - MessageHeaders headers = message.getHeaders(); - String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); - String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); - String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); - - log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}", - payload, receivedTopic, receivedQos, timestamp); - - Dict payloadDict = JsonUtils.parseMap(payload.toString()); - if (receivedTopic == null || payloadDict == null) { - return; - } - String imei = payloadDict.getStr("imei"); - String funcType = payloadDict.getStr("funcType"); - // 执行业务逻辑 - if(StringUtils.isNotBlank(imei)){ - String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; - String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; - RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900)); - //在线状态 - String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; - RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); - } - - String[] topicArr = receivedTopic.split("/"); - - NewMqttRuleContext context = new NewMqttRuleContext(); - context.setCommandType(topicArr[2]+"_"+funcType); - context.setDeviceImei(imei); - context.setPayloadDict(payloadDict); - - boolean ruleExecuted = newRuleEngine.executeRule(context); - - if (!ruleExecuted) { - log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType); - } -// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class); -// if (null == lockInfo) { -// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei); +//package com.fuyuanshen.global.mqtt.receiver; +// +//import cn.hutool.core.lang.Dict; +//import com.baomidou.lock.LockTemplate; +//import com.fuyuanshen.common.core.constant.GlobalConstants; +//import com.fuyuanshen.common.core.utils.StringUtils; +//import com.fuyuanshen.common.json.utils.JsonUtils; +//import com.fuyuanshen.common.redis.utils.RedisUtils; +//import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext; +//import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine; +//import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; +//import com.fuyuanshen.global.queue.MqttMessageQueueConstants; +//import lombok.extern.slf4j.Slf4j; +//import org.redisson.api.RLock; +//import org.redisson.api.RedissonClient; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.messaging.Message; +//import org.springframework.messaging.MessageHandler; +//import org.springframework.messaging.MessageHeaders; +//import org.springframework.messaging.MessagingException; +//import org.springframework.stereotype.Service; +// +//import java.time.Duration; +//import java.util.Objects; +//import java.util.concurrent.TimeUnit; +// +//import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; +// +//@Service +//@Slf4j +//public class NewReceiverMessageHandler implements MessageHandler { +// +// @Autowired +// private NewMqttRuleEngine newRuleEngine; +// +// @Override +// public void handleMessage(Message message) throws MessagingException { +// +// Object payload = message.getPayload(); +// MessageHeaders headers = message.getHeaders(); +// String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); +// String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); +// String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); +// +// log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}", +// payload, receivedTopic, receivedQos, timestamp); +// +// Dict payloadDict = JsonUtils.parseMap(payload.toString()); +// if (receivedTopic == null || payloadDict == null) { // return; // } -//// 获取锁成功,处理业务 -// try { -// if(StringUtils.isNotBlank(imei)){ -// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; -// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; -// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900)); -// //在线状态 -// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; -// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); -// } -// -// String[] topicArr = receivedTopic.split("/"); -// -// NewMqttRuleContext context = new NewMqttRuleContext(); -// context.setCommandType(topicArr[2]+"_"+funcType); -// context.setDeviceImei(imei); -// context.setPayloadDict(payloadDict); -// -// boolean ruleExecuted = newRuleEngine.executeRule(context); -// -// if (!ruleExecuted) { -// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType); -// } -// } finally { -// //释放锁 -// lockTemplate.releaseLock(lockInfo); +// String imei = payloadDict.getStr("imei"); +// String funcType = payloadDict.getStr("funcType"); +// // 执行业务逻辑 +// if(StringUtils.isNotBlank(imei)){ +// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; +// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; +// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900)); +// //在线状态 +// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; +// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); // } - - - /* ===== 追加:根据报文内容识别格式并统一解析 ===== */ -// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict); -// if (intType > 0) { -// MqttRuleContext newCtx = new MqttRuleContext(); -// String commandType = "Light_"+intType; -// newCtx.setCommandType(commandType); -// newCtx.setDeviceImei(imei); -// newCtx.setPayloadDict(payloadDict); // -// boolean ok = ruleEngine.executeRule(newCtx); -// if (!ok) { -// log.warn("新规则引擎未命中, imei={}", imei); -// } +// String[] topicArr = receivedTopic.split("/"); +// +// NewMqttRuleContext context = new NewMqttRuleContext(); +// context.setCommandType(topicArr[2]+"_"+funcType); +// context.setDeviceImei(imei); +// context.setPayloadDict(payloadDict); +// +// boolean ruleExecuted = newRuleEngine.executeRule(context); +// +// if (!ruleExecuted) { +// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType); // } - } -} +//// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class); +//// if (null == lockInfo) { +//// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei); +//// return; +//// } +////// 获取锁成功,处理业务 +//// try { +//// if(StringUtils.isNotBlank(imei)){ +//// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; +//// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; +//// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900)); +//// //在线状态 +//// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; +//// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); +//// } +//// +//// String[] topicArr = receivedTopic.split("/"); +//// +//// NewMqttRuleContext context = new NewMqttRuleContext(); +//// context.setCommandType(topicArr[2]+"_"+funcType); +//// context.setDeviceImei(imei); +//// context.setPayloadDict(payloadDict); +//// +//// boolean ruleExecuted = newRuleEngine.executeRule(context); +//// +//// if (!ruleExecuted) { +//// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType); +//// } +//// } finally { +//// //释放锁 +//// lockTemplate.releaseLock(lockInfo); +//// } +// +// +// /* ===== 追加:根据报文内容识别格式并统一解析 ===== */ +//// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict); +//// if (intType > 0) { +//// MqttRuleContext newCtx = new MqttRuleContext(); +//// String commandType = "Light_"+intType; +//// newCtx.setCommandType(commandType); +//// newCtx.setDeviceImei(imei); +//// newCtx.setPayloadDict(payloadDict); +//// +//// boolean ok = ruleEngine.executeRule(newCtx); +//// if (!ok) { +//// log.warn("新规则引擎未命中, imei={}", imei); +//// } +//// } +// } +//}