diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java index 2581db7..979cd35 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -52,9 +52,9 @@ public class ReceiverMessageHandler implements MessageHandler { //在线状态 String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62)); -// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; -// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; -// RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24)); + String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; + String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; + RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24)); } String state = payloadDict.getStr("state"); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java index 9172fa8..2c364fe 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java @@ -27,7 +27,7 @@ public class MqttMessageConsumer { private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10); // 初始化方法,启动消息监听 -// @PostConstruct + @PostConstruct public void start() { log.info("启动MQTT消息消费者..."); // 启动消息获取线程 @@ -100,7 +100,7 @@ public class MqttMessageConsumer { .set("online_status", 1); deviceMapper.update(updateWrapper); // 模拟业务处理耗时 - Thread.sleep(200); +// Thread.sleep(200); log.info("业务处理线程 {} 完成消息处理: {}", threadName, message); } catch (Exception e) {