diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java index dbfcfe8..55071cf 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java @@ -49,14 +49,6 @@ public class RedisKeyExpirationListener implements MessageListener { handleFunctionAccessExpired(element); } if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)) { -// threadPoolTaskExecutor.execute(() -> { -// log.info("设备离线:{}", expiredKey); -// String element = expiredKey.substring(GlobalConstants.GLOBAL_REDIS_KEY.length() + DEVICE_KEY_PREFIX.length(), expiredKey.length() - DEVICE_ONLINE_STATUS_KEY_PREFIX.length()); -// UpdateWrapper deviceUpdateWrapper = new UpdateWrapper<>(); -// deviceUpdateWrapper.eq("device_imei", element); -// deviceUpdateWrapper.set("online_status", 0); -// deviceMapper.update(deviceUpdateWrapper); -// }); threadPoolTaskExecutor.execute(() -> { log.info("设备离线:{}", expiredKey); @@ -72,9 +64,9 @@ public class RedisKeyExpirationListener implements MessageListener { try { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("device_imei", element); - queryWrapper.eq("online_status", 0); + queryWrapper.eq("online_status", 1); Long count = deviceMapper.selectCount(queryWrapper); - if(count == 0){ + if(count > 0){ UpdateWrapper deviceUpdateWrapper = new UpdateWrapper<>(); deviceUpdateWrapper.eq("device_imei", element); deviceUpdateWrapper.set("online_status", 0); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java index 979cd35..4e4b2fc 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -49,12 +49,12 @@ public class ReceiverMessageHandler implements MessageHandler { String[] subStr = receivedTopic.split("/"); String deviceImei = subStr[1]; if(StringUtils.isNotBlank(deviceImei)){ - //在线状态 - String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; - RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62)); String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY; RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24)); + //在线状态 + String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; + RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62)); } String state = payloadDict.getStr("state"); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java index 56084f8..1d30483 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java @@ -2,9 +2,12 @@ package com.fuyuanshen.global.queue; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.fuyuanshen.common.core.constant.GlobalConstants; +import com.fuyuanshen.common.core.utils.StringUtils; import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.equipment.domain.Device; import com.fuyuanshen.equipment.mapper.DeviceMapper; +import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; @@ -16,6 +19,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; + @Service @Slf4j public class MqttMessageConsumer { @@ -93,17 +98,21 @@ public class MqttMessageConsumer { String threadName = Thread.currentThread().getName(); try { log.info("业务处理线程 {} 开始处理消息: {}", threadName, message); - - QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.eq("device_imei", message); - queryWrapper.eq("online_status", 1); - Long count = deviceMapper.selectCount(queryWrapper); - if(count == 0){ + String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; + String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey); + if(StringUtils.isBlank(deviceOnlineStatusRedis)){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("device_imei", message) .set("online_status", 1); deviceMapper.update(updateWrapper); } +// QueryWrapper queryWrapper = new QueryWrapper<>(); +// queryWrapper.eq("device_imei", message); +// queryWrapper.eq("online_status", 1); +// Long count = deviceMapper.selectCount(queryWrapper); +// if(count == 0){ +// +// } // 模拟业务处理耗时 // Thread.sleep(200);