diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java index 05914700..15412aab 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java @@ -1,9 +1,5 @@ package com.fuyuanshen.global.mqtt.base; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.core.task.TaskExecutor; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.Comparator; @@ -16,9 +12,9 @@ import java.util.List; @Component public class MqttRuleEngine { - @Autowired - @Qualifier("threadPoolTaskExecutor") - private ThreadPoolTaskExecutor threadPoolTaskExecutor; +// @Autowired +// @Qualifier("threadPoolTaskExecutor") +// private ThreadPoolTaskExecutor threadPoolTaskExecutor; private final LinkedHashMap rulesMap = new LinkedHashMap<>(); @@ -41,7 +37,8 @@ public class MqttRuleEngine { int commandType = context.getCommandType(); MqttMessageRule mqttMessageRule = rulesMap.get("Light_" + commandType); if (mqttMessageRule != null) { - threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context)); +// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context)); + mqttMessageRule.execute(context); return true; } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttMessageRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttMessageRule.java new file mode 100644 index 00000000..754332e4 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttMessageRule.java @@ -0,0 +1,26 @@ +package com.fuyuanshen.global.mqtt.base; + +/** + * MQTT消息处理接口 + */ +public interface NewMqttMessageRule { + + /** + * 获取命令类型 + * @return 命令类型 + */ + String getCommandType(); + /** + * 执行处理 + * @param context 处理上下文 + */ + void execute(NewMqttRuleContext context); + + /** + * 获取优先级,数值越小优先级越高 + * @return 优先级 + */ + default int getPriority() { + return 0; + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttRuleContext.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttRuleContext.java new file mode 100644 index 00000000..28b8258c --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttRuleContext.java @@ -0,0 +1,25 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +import java.util.Map; + +/** + * MQTT消息处理上下文 + */ +@Data +public class NewMqttRuleContext { + /** + * 命令类型 + */ + private String commandType; + /** + * 设备IMEI + */ + private String deviceImei; + + /** + * MQTT消息负载字典 + */ + private Map payloadDict; +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttRuleEngine.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttRuleEngine.java new file mode 100644 index 00000000..017b172b --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/NewMqttRuleEngine.java @@ -0,0 +1,44 @@ +package com.fuyuanshen.global.mqtt.base; + +import org.springframework.stereotype.Component; + +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * MQTT消息引擎 + */ +@Component +public class NewMqttRuleEngine { + +// @Autowired +// @Qualifier("threadPoolTaskExecutor") +// private ThreadPoolTaskExecutor threadPoolTaskExecutor; + + private final LinkedHashMap rulesMap = new LinkedHashMap<>(); + public NewMqttRuleEngine(List rules) { + // 按优先级排序 + rules.sort(Comparator.comparing(NewMqttMessageRule::getPriority)); + rules.forEach(rule -> rulesMap.put(rule.getCommandType(), rule) + ); + } + + /** + * 执行匹配 + * @param context 处理上下文 + * @return + */ + public boolean executeRule(NewMqttRuleContext context) { + String commandType = context.getCommandType(); +// String funcType = context.getFuncType(); + NewMqttMessageRule mqttMessageRule = rulesMap.get(commandType); + if (mqttMessageRule != null) { +// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context)); + mqttMessageRule.execute(context); + return true; + } + + return false; + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java index 102005ca..ead8e0b4 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java @@ -44,12 +44,12 @@ public class MqttInboundConfiguration { if (url == null) { throw new IllegalStateException("MQTT服务器URL未配置"); } - + String subTopic = mqttPropertiesConfig.getSubTopic(); MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( url, clientId, mqttPahoClientFactory, - mqttPropertiesConfig.getSubTopic().split(",") + subTopic.split(",") ); mqttPahoMessageDrivenChannelAdapter.setQos(1); mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java index 84a56082..e969042e 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java @@ -47,7 +47,7 @@ public class MqttOutboundConfiguration { mqttPahoClientFactory ); mqttPahoMessageHandler.setDefaultQos(1); - mqttPahoMessageHandler.setDefaultTopic("B/#"); + mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic()); mqttPahoMessageHandler.setAsync(true); return mqttPahoMessageHandler; } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java index 6c5d7db9..be4c69fe 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java @@ -14,6 +14,8 @@ public class MqttPropertiesConfig { private String url; private String subClientId; private String subTopic; + private String subTopic2; private String pubClientId; private String pubTopic; + private String pubTopic2; } \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java new file mode 100644 index 00000000..667f3799 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttInboundConfiguration.java @@ -0,0 +1,68 @@ +package com.fuyuanshen.global.mqtt.config; + + +import cn.hutool.core.lang.UUID; +import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + + +@Configuration +@Slf4j +public class NewMqttInboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + @Autowired + private NewReceiverMessageHandler receiverMessageHandler2; + //消息通道 + @Bean + public MessageChannel messageInboundChannel2(){ + return new DirectChannel(); + } + + /** + * 配置入站适配器 + * 作用: 设置订阅主题,以及指定消息的通道 等相关属性 + * */ + @Bean + public MessageProducer messageProducer2(){ + // 生成一个不重复的随机数 + String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID(); + String subTopic = mqttPropertiesConfig.getSubTopic2(); + log.info("订阅主题:{}", subTopic); + MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter( + mqttPropertiesConfig.getUrl(), + clientId, + mqttPahoClientFactory, + subTopic.split(",") + ); + mqttPahoMessageDrivenChannelAdapter.setQos(1); + mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter()); + mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2()); + return mqttPahoMessageDrivenChannelAdapter; + } + /** 指定处理消息来自哪个通道 */ + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel2") + public MessageHandler messageHandler2(){ + return receiverMessageHandler2; + } + + // @Bean + // @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确 + // public MessageHandler deviceAlarmMessageHandler() { + // return new DeviceAlrmMessageHandler(); + // } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java new file mode 100644 index 00000000..3e11af7d --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/NewMqttOutboundConfiguration.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.global.mqtt.config; + +import cn.hutool.core.lang.UUID; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +@Configuration +@Slf4j +public class NewMqttOutboundConfiguration { + @Autowired + private MqttPropertiesConfig mqttPropertiesConfig; + @Autowired + private MqttPahoClientFactory mqttPahoClientFactory; + + // 消息通道 + @Bean + public MessageChannel mqttOutboundChannel2(){ + return new DirectChannel(); + } + + + /** 配置出站消息处理器 */ + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理 + public MessageHandler mqttOutboundMessageHandler2(){ + String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID(); + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler( + mqttPropertiesConfig.getUrl(), + clientId, + mqttPahoClientFactory + ); + mqttPahoMessageHandler.setDefaultQos(1); + mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2()); + mqttPahoMessageHandler.setAsync(true); + return mqttPahoMessageHandler; + } + +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java index 8dacb880..a39e8990 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java @@ -1,20 +1,22 @@ package com.fuyuanshen.global.mqtt.publish; +import cn.dev33.satoken.annotation.SaIgnore; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController -@RequestMapping("/api/") +@RequestMapping("/api") +@SaIgnore @Slf4j public class DeviceDataController { @Autowired private MqttClientTest mqttClientTest; - -// @PostMapping("/{deviceId}/command") + @GetMapping("/command") public ResponseEntity sendCommand() { mqttClientTest.sendMsg(); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java index 776b4997..4cf3cb22 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java @@ -13,10 +13,10 @@ public class MqttClientTest { private MqttGateway mqttGateway; public void sendMsg() { - mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot"); + mqttGateway.sendMsgToMqtt("command/894078/HBY670/864865082081523", "hello mqtt spring boot"); log.info("message is send"); - mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2"); + mqttGateway.sendMsgToMqtt("report/894078/HBY670/864865082081523", "hello mqtt spring boot2"); log.info("message is send2"); } } \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java new file mode 100644 index 00000000..f08b53ee --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/NewReceiverMessageHandler.java @@ -0,0 +1,146 @@ +package com.fuyuanshen.global.mqtt.receiver; + +import cn.hutool.core.lang.Dict; +import com.baomidou.lock.LockTemplate; +import com.fuyuanshen.common.core.constant.GlobalConstants; +import com.fuyuanshen.common.core.utils.StringUtils; +import com.fuyuanshen.common.json.utils.JsonUtils; +import com.fuyuanshen.common.redis.utils.RedisUtils; +import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext; +import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine; +import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; +import com.fuyuanshen.global.queue.MqttMessageQueueConstants; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; + +@Service +@Slf4j +public class NewReceiverMessageHandler implements MessageHandler { + + @Autowired + private NewMqttRuleEngine newRuleEngine; + + @Override + public void handleMessage(Message message) throws MessagingException { + + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + + log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}", + payload, receivedTopic, receivedQos, timestamp); + + Dict payloadDict = JsonUtils.parseMap(payload.toString()); + if (receivedTopic == null || payloadDict == null) { + return; + } + String imei = payloadDict.getStr("imei"); + String funcType = payloadDict.getStr("funcType"); + RedissonClient client = RedisUtils.getClient(); + String lockKey = "mqtt:consumer:lock:"; + String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei; + log.info("MQTT2获取锁开始{}", KEY); + RLock lock = client.getLock(KEY); + + try { + // 尝试获取锁, + boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS); + if (isLocked) { + // 执行业务逻辑 + if(StringUtils.isNotBlank(imei)){ + String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; + String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; + RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900)); + //在线状态 + String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; + RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); + } + + String[] topicArr = receivedTopic.split("/"); + + NewMqttRuleContext context = new NewMqttRuleContext(); + context.setCommandType(topicArr[2]+"_"+funcType); + context.setDeviceImei(imei); + context.setPayloadDict(payloadDict); + + boolean ruleExecuted = newRuleEngine.executeRule(context); + + if (!ruleExecuted) { + log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType); + } + }else{ + log.warn("MQTT2获取锁失败,请稍后再试"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + // 释放锁 + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } +// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class); +// if (null == lockInfo) { +// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei); +// return; +// } +//// 获取锁成功,处理业务 +// try { +// if(StringUtils.isNotBlank(imei)){ +// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; +// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; +// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900)); +// //在线状态 +// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; +// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); +// } +// +// String[] topicArr = receivedTopic.split("/"); +// +// NewMqttRuleContext context = new NewMqttRuleContext(); +// context.setCommandType(topicArr[2]+"_"+funcType); +// context.setDeviceImei(imei); +// context.setPayloadDict(payloadDict); +// +// boolean ruleExecuted = newRuleEngine.executeRule(context); +// +// if (!ruleExecuted) { +// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType); +// } +// } finally { +// //释放锁 +// lockTemplate.releaseLock(lockInfo); +// } + + + /* ===== 追加:根据报文内容识别格式并统一解析 ===== */ +// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict); +// if (intType > 0) { +// MqttRuleContext newCtx = new MqttRuleContext(); +// String commandType = "Light_"+intType; +// newCtx.setCommandType(commandType); +// newCtx.setDeviceImei(imei); +// newCtx.setPayloadDict(payloadDict); +// +// boolean ok = ruleEngine.executeRule(newCtx); +// if (!ok) { +// log.warn("新规则引擎未命中, imei={}", imei); +// } +// } + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java index 5b266129..8b2e37aa 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -9,9 +9,12 @@ import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.global.mqtt.base.MqttRuleContext; import com.fuyuanshen.global.mqtt.base.MqttRuleEngine; import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType; +import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext; import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; import com.fuyuanshen.global.queue.MqttMessageQueueConstants; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -21,6 +24,7 @@ import org.springframework.stereotype.Service; import java.time.Duration; import java.util.Objects; +import java.util.concurrent.TimeUnit; import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; @@ -38,55 +42,77 @@ public class ReceiverMessageHandler implements MessageHandler { String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); - + log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}", payload, receivedTopic, receivedQos, timestamp); - + Dict payloadDict = JsonUtils.parseMap(payload.toString()); if (receivedTopic == null || payloadDict == null) { return; } String[] subStr = receivedTopic.split("/"); String deviceImei = subStr[1]; - String state = payloadDict.getStr("state"); - Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state); - if(StringUtils.isNotBlank(deviceImei)){ - String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; - String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; - RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofSeconds(900)); - //在线状态 - String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; - RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); - } - - if (convertArr.length > 0) { - Byte val1 = (Byte) convertArr[0]; - MqttRuleContext context = new MqttRuleContext(); - context.setCommandType(val1); - context.setConvertArr(convertArr); - context.setDeviceImei(deviceImei); - context.setPayloadDict(payloadDict); + RedissonClient client = RedisUtils.getClient(); + String lockKey = "mqtt:consumer:lock:"; + String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + deviceImei; + RLock lock = client.getLock(KEY); - boolean ruleExecuted = ruleEngine.executeRule(context); - - if (!ruleExecuted) { - log.warn("未找到匹配的规则来处理命令类型: {}", val1); + try { + // 尝试获取锁, + boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS); + if (isLocked) { + String state = payloadDict.getStr("state"); + Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state); + if(StringUtils.isNotBlank(deviceImei)){ + String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; + String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; + RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofSeconds(900)); + //在线状态 + String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; + RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360)); + } + + + + if (convertArr.length > 0) { + Byte val1 = (Byte) convertArr[0]; + MqttRuleContext context = new MqttRuleContext(); + context.setCommandType(val1); + context.setConvertArr(convertArr); + context.setDeviceImei(deviceImei); + context.setPayloadDict(payloadDict); + + boolean ruleExecuted = ruleEngine.executeRule(context); + + if (!ruleExecuted) { + log.warn("未找到匹配的规则来处理命令类型: {}", val1); + } + } + + /* ===== 追加:根据报文内容识别格式并统一解析 ===== */ + int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict); + if (intType > 0) { + MqttRuleContext newCtx = new MqttRuleContext(); + newCtx.setCommandType((byte) intType); + newCtx.setDeviceImei(deviceImei); + newCtx.setPayloadDict(payloadDict); + + boolean ok = ruleEngine.executeRule(newCtx); + if (!ok) { + log.warn("新规则引擎未命中, imei={}", deviceImei); + } + } + }else{ + log.warn("MQTT获取锁失败,请稍后再试"); } - } - - /* ===== 追加:根据报文内容识别格式并统一解析 ===== */ - int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict); - if (intType > 0) { - MqttRuleContext newCtx = new MqttRuleContext(); - newCtx.setCommandType((byte) intType); - newCtx.setDeviceImei(deviceImei); - newCtx.setPayloadDict(payloadDict); - - boolean ok = ruleEngine.executeRule(newCtx); - if (!ok) { - log.warn("新规则引擎未命中, imei={}", deviceImei); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + // 释放锁 + if (lock.isHeldByCurrentThread()) { + lock.unlock(); } } } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqActiveReportingDeviceDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqActiveReportingDeviceDataRule.java index 50fd65a0..36dcff6d 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqActiveReportingDeviceDataRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqActiveReportingDeviceDataRule.java @@ -38,18 +38,18 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule { @Override public void execute(MqttRuleContext context) { try { - Object[] convertArr = context.getConvertArr(); - // Latitude, longitude - //主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间 - String mainLightMode = convertArr[1].toString(); - String laserLightMode = convertArr[2].toString(); - String batteryPercentage = convertArr[3].toString(); - String chargeState = convertArr[4].toString(); - String batteryRemainingTime = convertArr[5].toString(); - - // 发送设备状态和位置信息到Redis - asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode, - batteryPercentage, chargeState, batteryRemainingTime); +// Object[] convertArr = context.getConvertArr(); +// // Latitude, longitude +// //主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间 +// String mainLightMode = convertArr[1].toString(); +// String laserLightMode = convertArr[2].toString(); +// String batteryPercentage = convertArr[3].toString(); +// String chargeState = convertArr[4].toString(); +// String batteryRemainingTime = convertArr[5].toString(); +// +// // 发送设备状态和位置信息到Redis +// asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode, +// batteryPercentage, chargeState, batteryRemainingTime); } catch (Exception e) { log.error("处理上报数据命令时出错", e); } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqBootLogoRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqBootLogoRule.java index 2a95db74..7ea08ff1 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqBootLogoRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqBootLogoRule.java @@ -44,37 +44,37 @@ public class BjqBootLogoRule implements MqttMessageRule { public void execute(MqttRuleContext context) { String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei(); try { - Byte val2 = (Byte) context.getConvertArr()[1]; - if (val2 == 100) { - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); - return; - } - - String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX); - if (StringUtils.isEmpty(data)) { - return; - } - - byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data); - byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512); - log.info("第{}块数据大小: {} 字节", val2, specificChunk.length); - - ArrayList intData = new ArrayList<>(); - intData.add(3); - intData.add((int) val2); - ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData); - intData.add(0); - intData.add(0); - intData.add(0); - intData.add(0); - - Map map = new HashMap<>(); - map.put("instruct", intData); - - mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); - log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}", - MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), - JsonUtils.toJsonString(map)); +// Byte val2 = (Byte) context.getConvertArr()[1]; +// if (val2 == 100) { +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); +// return; +// } +// +// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX); +// if (StringUtils.isEmpty(data)) { +// return; +// } +// +// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data); +// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512); +// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length); +// +// ArrayList intData = new ArrayList<>(); +// intData.add(3); +// intData.add((int) val2); +// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData); +// intData.add(0); +// intData.add(0); +// intData.add(0); +// intData.add(0); +// +// Map map = new HashMap<>(); +// map.put("instruct", intData); +// +// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); +// log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}", +// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), +// JsonUtils.toJsonString(map)); } catch (Exception e) { log.error("处理开机LOGO时出错", e); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLaserModeSettingsRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLaserModeSettingsRule.java index 74018004..f849a5d8 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLaserModeSettingsRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLaserModeSettingsRule.java @@ -34,15 +34,15 @@ public class BjqLaserModeSettingsRule implements MqttMessageRule { public void execute(MqttRuleContext context) { String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei(); try { - Object[] convertArr = context.getConvertArr(); - - String mode = convertArr[1].toString(); - if(StringUtils.isNotBlank(mode)){ - // 发送设备状态和位置信息到Redis - syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode); - } - - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30)); +// Object[] convertArr = context.getConvertArr(); +// +// String mode = convertArr[1].toString(); +// if(StringUtils.isNotBlank(mode)){ +// // 发送设备状态和位置信息到Redis +// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode); +// } +// +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30)); } catch (Exception e) { log.error("处理激光模式命令时出错", e); RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30)); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLightBrightnessRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLightBrightnessRule.java index 0fc6dd8e..722ab603 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLightBrightnessRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLightBrightnessRule.java @@ -34,15 +34,15 @@ public class BjqLightBrightnessRule implements MqttMessageRule { public void execute(MqttRuleContext context) { String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei(); try { - Object[] convertArr = context.getConvertArr(); - - String convertValue = convertArr[1].toString(); - // 将设备状态信息存储到Redis中 - String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX; - - // 存储到Redis - RedisUtils.setCacheObject(deviceRedisKey, convertValue); - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); +// Object[] convertArr = context.getConvertArr(); +// +// String convertValue = convertArr[1].toString(); +// // 将设备状态信息存储到Redis中 +// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX; +// +// // 存储到Redis +// RedisUtils.setCacheObject(deviceRedisKey, convertValue); +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); } catch (Exception e) { log.error("处理灯光亮度命令时出错", e); RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20)); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLocationDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLocationDataRule.java index e4377e42..539e08d1 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLocationDataRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqLocationDataRule.java @@ -52,26 +52,26 @@ public class BjqLocationDataRule implements MqttMessageRule { public void execute(MqttRuleContext context) { String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei(); try { - Object[] convertArr = context.getConvertArr(); - // Latitude, longitude - String latitude = convertArr[1].toString(); - String longitude = convertArr[2].toString(); - // 判断 latitude 和 longitude 是否都为 0 - if ("0".equals(latitude) && "0".equals(longitude)) { - log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude); - return; - } - // 异步发送经纬度到Redis - asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude); - // 异步保存数据 - asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude); - - Map map = buildLocationDataMap(latitude, longitude); - mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); - log.info("发送定位数据到设备=>topic:{},payload:{}", - MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), - JsonUtils.toJsonString(map)); - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); +// Object[] convertArr = context.getConvertArr(); +// // Latitude, longitude +// String latitude = convertArr[1].toString(); +// String longitude = convertArr[2].toString(); +// // 判断 latitude 和 longitude 是否都为 0 +// if ("0".equals(latitude) && "0".equals(longitude)) { +// log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude); +// return; +// } +// // 异步发送经纬度到Redis +// asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude); +// // 异步保存数据 +// asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude); +// +// Map map = buildLocationDataMap(latitude, longitude); +// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); +// log.info("发送定位数据到设备=>topic:{},payload:{}", +// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), +// JsonUtils.toJsonString(map)); +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); } catch (Exception e) { log.error("处理定位数据命令时出错", e); RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20)); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqModeRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqModeRule.java index 8f97cec1..ef667606 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqModeRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqModeRule.java @@ -50,45 +50,47 @@ public class BjqModeRule implements MqttMessageRule { public void execute(MqttRuleContext context) { String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei(); try { - Object[] convertArr = context.getConvertArr(); - - String mainLightMode = convertArr[1].toString(); - String batteryRemainingTime = convertArr[2].toString(); - if (StringUtils.isNotBlank(mainLightMode)) { - log.info("设备离线mainLightMode:{}", mainLightMode); - if ("0".equals(mainLightMode)) { - - // 设备离线 - String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX; - RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0"); - - String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending"; - String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng); - if ("1".equals(messageSendingValue)) { - // 设置为故障状态 - RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2"); - UpdateWrapper updateWrapper = new UpdateWrapper<>(); - updateWrapper.eq("device_imei", context.getDeviceImei()); - updateWrapper.set("online_status", 2); - deviceService.update(updateWrapper); - RedisUtils.deleteObject(sendMessageIng); - - // 解除告警 - String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX; - if (RedisUtils.getCacheObject(deviceRedisKey) != null) { - RedisUtils.deleteObject(deviceRedisKey); - } - cancelAlarm(context.getDeviceImei()); - } - } - // 发送设备状态和位置信息到Redis - syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode); - String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX; - - // 存储到Redis - RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime); - } - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); + log.info("处理灯光模式命令"); + log.info("MQTT消息负载字典:{}", context.getPayloadDict()); +// Object[] convertArr = context.getConvertArr(); +// +// String mainLightMode = convertArr[1].toString(); +// String batteryRemainingTime = convertArr[2].toString(); +// if (StringUtils.isNotBlank(mainLightMode)) { +// log.info("设备离线mainLightMode:{}", mainLightMode); +// if ("0".equals(mainLightMode)) { +// +// // 设备离线 +// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX; +// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0"); +// +// String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending"; +// String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng); +// if ("1".equals(messageSendingValue)) { +// // 设置为故障状态 +// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2"); +// UpdateWrapper updateWrapper = new UpdateWrapper<>(); +// updateWrapper.eq("device_imei", context.getDeviceImei()); +// updateWrapper.set("online_status", 2); +// deviceService.update(updateWrapper); +// RedisUtils.deleteObject(sendMessageIng); +// +// // 解除告警 +// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX; +// if (RedisUtils.getCacheObject(deviceRedisKey) != null) { +// RedisUtils.deleteObject(deviceRedisKey); +// } +// cancelAlarm(context.getDeviceImei()); +// } +// } +// // 发送设备状态和位置信息到Redis +// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode); +// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX; +// +// // 存储到Redis +// RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime); +// } +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); } catch (Exception e) { log.error("处理灯光模式命令时出错", e); RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20)); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqPersonnelInfoDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqPersonnelInfoDataRule.java index 49fabe24..acbbc7ee 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqPersonnelInfoDataRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqPersonnelInfoDataRule.java @@ -34,8 +34,8 @@ public class BjqPersonnelInfoDataRule implements MqttMessageRule { public void execute(MqttRuleContext context) { String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei(); try { - Object[] convertArr = context.getConvertArr(); - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30)); +// Object[] convertArr = context.getConvertArr(); +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30)); } catch (Exception e) { log.error("处理定位数据命令时出错", e); RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30)); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqSendMessageRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqSendMessageRule.java index d2d3620b..50e75fe0 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqSendMessageRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/bjq/BjqSendMessageRule.java @@ -45,47 +45,47 @@ public class BjqSendMessageRule implements MqttMessageRule { try { // Byte val2 = (Byte) context.getConvertArr()[1]; - String val2Str = context.getConvertArr()[1].toString(); - int val2 = Integer.parseInt(val2Str); - System.out.println("收到设备信息命令:"+val2); - if (val2 == 100) { - RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); - return; - } - - if(val2==200){ - String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending"; - RedisUtils.deleteObject(sendMessageIng); - return; - } - - - String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data"); - if (StringUtils.isEmpty(data)) { - return; - } - - byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data); - byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512); - log.info("第{}块数据大小: {} 字节", val2, specificChunk.length); -// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk)); - - ArrayList intData = new ArrayList<>(); - intData.add(6); - intData.add(val2); - ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData); - intData.add(0); - intData.add(0); - intData.add(0); - intData.add(0); - - Map map = new HashMap<>(); - map.put("instruct", intData); - - mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); - log.info("发送设备信息数据到设备消息=>topic:{},payload:{}", - MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), - JsonUtils.toJsonString(map)); +// String val2Str = context.getConvertArr()[1].toString(); +// int val2 = Integer.parseInt(val2Str); +// System.out.println("收到设备信息命令:"+val2); +// if (val2 == 100) { +// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); +// return; +// } +// +// if(val2==200){ +// String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending"; +// RedisUtils.deleteObject(sendMessageIng); +// return; +// } +// +// +// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data"); +// if (StringUtils.isEmpty(data)) { +// return; +// } +// +// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data); +// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512); +// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length); +//// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk)); +// +// ArrayList intData = new ArrayList<>(); +// intData.add(6); +// intData.add(val2); +// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData); +// intData.add(0); +// intData.add(0); +// intData.add(0); +// intData.add(0); +// +// Map map = new HashMap<>(); +// map.put("instruct", intData); +// +// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); +// log.info("发送设备信息数据到设备消息=>topic:{},payload:{}", +// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), +// JsonUtils.toJsonString(map)); } catch (Exception e) { log.error("处理发送设备信息时出错", e); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java index 0ac95099..de5e16db 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java @@ -7,14 +7,18 @@ import com.fuyuanshen.common.core.utils.StringUtils; import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.equipment.domain.Device; import com.fuyuanshen.equipment.mapper.DeviceMapper; +import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext; import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -95,40 +99,18 @@ public class MqttMessageConsumer { } // 处理具体业务逻辑的方法 - private void processMessage(String message) { + private void processMessage(String imei) { String threadName = Thread.currentThread().getName(); try { - log.info("业务处理线程 {} 开始处理消息: {}", threadName, message); -// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; -// String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey); -// if(StringUtils.isBlank(deviceOnlineStatusRedis)){ -// UpdateWrapper updateWrapper = new UpdateWrapper<>(); -// updateWrapper.eq("device_imei", message) -// .set("online_status", 1); -// deviceMapper.update(updateWrapper); -// } -// QueryWrapper queryWrapper = new QueryWrapper<>(); -// queryWrapper.eq("device_imei", message); -// queryWrapper.eq("online_status", 1); -// Long count = deviceMapper.selectCount(queryWrapper); -// if(count == 0){ -// UpdateWrapper updateWrapper = new UpdateWrapper<>(); -// updateWrapper.eq("device_imei", message) -// .eq("online_status", 0) -// .set("online_status", 1); -// deviceMapper.update(updateWrapper); -// } + // 执行业务逻辑 UpdateWrapper updateWrapper = new UpdateWrapper<>(); - updateWrapper.eq("device_imei", message) + updateWrapper.eq("device_imei", imei) .in("online_status", 0,2) .set("online_status", 1); int update = deviceMapper.update(updateWrapper); - // 模拟业务处理耗时 -// Thread.sleep(200); - - log.info("业务处理线程 {} 完成消息处理: {}", threadName, message); + } catch (Exception e) { - log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e); + log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, imei, e); } }