Compare commits

5 Commits

Author SHA1 Message Date
15b3348989 Merge branch 'dyf-device' into 6170 2025-09-15 09:41:56 +08:00
4004aa1090 在线状态修改优化2 2025-09-13 16:51:56 +08:00
39c8375f08 在线状态修改优化 2025-09-13 16:46:05 +08:00
64c81ac44e 在线状态修改2 2025-09-13 15:51:34 +08:00
41eb3d9a33 在线状态修改 2025-09-13 15:37:48 +08:00
3 changed files with 39 additions and 26 deletions

View File

@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil;
import com.baomidou.lock.LockInfo; import com.baomidou.lock.LockInfo;
import com.baomidou.lock.LockTemplate; import com.baomidou.lock.LockTemplate;
import com.baomidou.lock.executor.RedissonLockExecutor; import com.baomidou.lock.executor.RedissonLockExecutor;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.fuyuanshen.common.core.constant.GlobalConstants; import com.fuyuanshen.common.core.constant.GlobalConstants;
import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.common.redis.utils.RedisUtils;
@ -48,14 +49,6 @@ public class RedisKeyExpirationListener implements MessageListener {
handleFunctionAccessExpired(element); handleFunctionAccessExpired(element);
} }
if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)) { if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)) {
// threadPoolTaskExecutor.execute(() -> {
// log.info("设备离线:{}", expiredKey);
// String element = expiredKey.substring(GlobalConstants.GLOBAL_REDIS_KEY.length() + DEVICE_KEY_PREFIX.length(), expiredKey.length() - DEVICE_ONLINE_STATUS_KEY_PREFIX.length());
// UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
// deviceUpdateWrapper.eq("device_imei", element);
// deviceUpdateWrapper.set("online_status", 0);
// deviceMapper.update(deviceUpdateWrapper);
// });
threadPoolTaskExecutor.execute(() -> { threadPoolTaskExecutor.execute(() -> {
log.info("设备离线:{}", expiredKey); log.info("设备离线:{}", expiredKey);
@ -69,10 +62,16 @@ public class RedisKeyExpirationListener implements MessageListener {
if (lockInfo != null) { if (lockInfo != null) {
try { try {
UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>(); QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
deviceUpdateWrapper.eq("device_imei", element); queryWrapper.eq("device_imei", element);
deviceUpdateWrapper.set("online_status", 0); queryWrapper.eq("online_status", 1);
deviceMapper.update(deviceUpdateWrapper); Long count = deviceMapper.selectCount(queryWrapper);
if(count > 0){
UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
deviceUpdateWrapper.eq("device_imei", element);
deviceUpdateWrapper.set("online_status", 0);
deviceMapper.update(deviceUpdateWrapper);
}
} finally { } finally {
//释放锁 //释放锁
lockTemplate.releaseLock(lockInfo); lockTemplate.releaseLock(lockInfo);

View File

@ -49,12 +49,12 @@ public class ReceiverMessageHandler implements MessageHandler {
String[] subStr = receivedTopic.split("/"); String[] subStr = receivedTopic.split("/");
String deviceImei = subStr[1]; String deviceImei = subStr[1];
if(StringUtils.isNotBlank(deviceImei)){ if(StringUtils.isNotBlank(deviceImei)){
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24));
//在线状态 //在线状态
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62)); RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62));
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
// RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24));
} }
String state = payloadDict.getStr("state"); String state = payloadDict.getStr("state");

View File

@ -1,9 +1,13 @@
package com.fuyuanshen.global.queue; package com.fuyuanshen.global.queue;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.fuyuanshen.common.core.constant.GlobalConstants;
import com.fuyuanshen.common.core.utils.StringUtils;
import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.common.redis.utils.RedisUtils;
import com.fuyuanshen.equipment.domain.Device; import com.fuyuanshen.equipment.domain.Device;
import com.fuyuanshen.equipment.mapper.DeviceMapper; import com.fuyuanshen.equipment.mapper.DeviceMapper;
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -15,6 +19,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
@Service @Service
@Slf4j @Slf4j
public class MqttMessageConsumer { public class MqttMessageConsumer {
@ -27,7 +33,7 @@ public class MqttMessageConsumer {
private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10); private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10);
// 初始化方法,启动消息监听 // 初始化方法,启动消息监听
// @PostConstruct @PostConstruct
public void start() { public void start() {
log.info("启动MQTT消息消费者..."); log.info("启动MQTT消息消费者...");
// 启动消息获取线程 // 启动消息获取线程
@ -62,7 +68,7 @@ public class MqttMessageConsumer {
public void consumeMessages() { public void consumeMessages() {
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
String threadName = Thread.currentThread().getName(); String threadName = Thread.currentThread().getName();
log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey); // log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey);
try { try {
while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) { while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) {
@ -75,7 +81,7 @@ public class MqttMessageConsumer {
); );
if (message != null) { if (message != null) {
log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message); // log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message);
// 将消息处理任务提交到处理线程池 // 将消息处理任务提交到处理线程池
messageProcessorPool.submit(() -> processMessage(message)); messageProcessorPool.submit(() -> processMessage(message));
} }
@ -92,15 +98,23 @@ public class MqttMessageConsumer {
String threadName = Thread.currentThread().getName(); String threadName = Thread.currentThread().getName();
try { try {
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message); log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
// 实现具体的业务逻辑 String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
// 例如更新数据库、发送通知等 if(StringUtils.isBlank(deviceOnlineStatusRedis)){
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>(); UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("device_imei", message) updateWrapper.eq("device_imei", message)
.set("online_status", 1); .set("online_status", 1);
deviceMapper.update(updateWrapper); deviceMapper.update(updateWrapper);
}
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
// queryWrapper.eq("device_imei", message);
// queryWrapper.eq("online_status", 1);
// Long count = deviceMapper.selectCount(queryWrapper);
// if(count == 0){
//
// }
// 模拟业务处理耗时 // 模拟业务处理耗时
Thread.sleep(200); // Thread.sleep(200);
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message); log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
} catch (Exception e) { } catch (Exception e) {