From 41eb3d9a339f8cba935dc8a458180fafddd80f7d Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Sat, 13 Sep 2025 15:37:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E7=8A=B6=E6=80=81=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/mqtt/receiver/ReceiverMessageHandler.java | 6 +++--- .../com/fuyuanshen/global/queue/MqttMessageConsumer.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java index 2581db7c5..979cd3528 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -52,9 +52,9 @@ public class ReceiverMessageHandler implements MessageHandler { //在线状态 String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62)); -// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; -// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; -// RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24)); + String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; + String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; + RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24)); } String state = payloadDict.getStr("state"); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java index 9172fa8e8..2c364fe62 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java @@ -27,7 +27,7 @@ public class MqttMessageConsumer { private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10); // 初始化方法,启动消息监听 -// @PostConstruct + @PostConstruct public void start() { log.info("启动MQTT消息消费者..."); // 启动消息获取线程 @@ -100,7 +100,7 @@ public class MqttMessageConsumer { .set("online_status", 1); deviceMapper.update(updateWrapper); // 模拟业务处理耗时 - Thread.sleep(200); +// Thread.sleep(200); log.info("业务处理线程 {} 完成消息处理: {}", threadName, message); } catch (Exception e) {