From 64c81ac44eb8eea0732600fcb48ecf0d2cfe1c0a Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Sat, 13 Sep 2025 15:51:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E7=8A=B6=E6=80=81=E4=BF=AE?= =?UTF-8?q?=E6=94=B92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/RedisKeyExpirationListener.java | 15 +++++++++++---- .../global/queue/MqttMessageConsumer.java | 19 ++++++++++++------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java index b3f64ecd..dbfcfe85 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/listener/RedisKeyExpirationListener.java @@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil; import com.baomidou.lock.LockInfo; import com.baomidou.lock.LockTemplate; import com.baomidou.lock.executor.RedissonLockExecutor; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.fuyuanshen.common.core.constant.GlobalConstants; import com.fuyuanshen.common.redis.utils.RedisUtils; @@ -69,10 +70,16 @@ public class RedisKeyExpirationListener implements MessageListener { if (lockInfo != null) { try { - UpdateWrapper deviceUpdateWrapper = new UpdateWrapper<>(); - deviceUpdateWrapper.eq("device_imei", element); - deviceUpdateWrapper.set("online_status", 0); - deviceMapper.update(deviceUpdateWrapper); + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("device_imei", element); + queryWrapper.eq("online_status", 0); + Long count = deviceMapper.selectCount(queryWrapper); + if(count == 0){ + UpdateWrapper deviceUpdateWrapper = new UpdateWrapper<>(); + deviceUpdateWrapper.eq("device_imei", element); + deviceUpdateWrapper.set("online_status", 0); + deviceMapper.update(deviceUpdateWrapper); + } } finally { //释放锁 lockTemplate.releaseLock(lockInfo); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java index 2c364fe6..56084f83 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java @@ -1,5 +1,6 @@ package com.fuyuanshen.global.queue; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.equipment.domain.Device; @@ -92,13 +93,17 @@ public class MqttMessageConsumer { String threadName = Thread.currentThread().getName(); try { log.info("业务处理线程 {} 开始处理消息: {}", threadName, message); - - // 实现具体的业务逻辑 - // 例如更新数据库、发送通知等 - UpdateWrapper updateWrapper = new UpdateWrapper<>(); - updateWrapper.eq("device_imei", message) - .set("online_status", 1); - deviceMapper.update(updateWrapper); + + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("device_imei", message); + queryWrapper.eq("online_status", 1); + Long count = deviceMapper.selectCount(queryWrapper); + if(count == 0){ + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("device_imei", message) + .set("online_status", 1); + deviceMapper.update(updateWrapper); + } // 模拟业务处理耗时 // Thread.sleep(200);