forked from dyf/fys-Multi-tenant
在线状态修改优化
This commit is contained in:
@ -49,14 +49,6 @@ public class RedisKeyExpirationListener implements MessageListener {
|
||||
handleFunctionAccessExpired(element);
|
||||
}
|
||||
if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)) {
|
||||
// threadPoolTaskExecutor.execute(() -> {
|
||||
// log.info("设备离线:{}", expiredKey);
|
||||
// String element = expiredKey.substring(GlobalConstants.GLOBAL_REDIS_KEY.length() + DEVICE_KEY_PREFIX.length(), expiredKey.length() - DEVICE_ONLINE_STATUS_KEY_PREFIX.length());
|
||||
// UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
|
||||
// deviceUpdateWrapper.eq("device_imei", element);
|
||||
// deviceUpdateWrapper.set("online_status", 0);
|
||||
// deviceMapper.update(deviceUpdateWrapper);
|
||||
// });
|
||||
|
||||
threadPoolTaskExecutor.execute(() -> {
|
||||
log.info("设备离线:{}", expiredKey);
|
||||
@ -72,9 +64,9 @@ public class RedisKeyExpirationListener implements MessageListener {
|
||||
try {
|
||||
QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("device_imei", element);
|
||||
queryWrapper.eq("online_status", 0);
|
||||
queryWrapper.eq("online_status", 1);
|
||||
Long count = deviceMapper.selectCount(queryWrapper);
|
||||
if(count == 0){
|
||||
if(count > 0){
|
||||
UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
|
||||
deviceUpdateWrapper.eq("device_imei", element);
|
||||
deviceUpdateWrapper.set("online_status", 0);
|
||||
|
||||
@ -49,12 +49,12 @@ public class ReceiverMessageHandler implements MessageHandler {
|
||||
String[] subStr = receivedTopic.split("/");
|
||||
String deviceImei = subStr[1];
|
||||
if(StringUtils.isNotBlank(deviceImei)){
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62));
|
||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24));
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62));
|
||||
}
|
||||
|
||||
String state = payloadDict.getStr("state");
|
||||
|
||||
@ -2,9 +2,12 @@ package com.fuyuanshen.global.queue;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.equipment.domain.Device;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -16,6 +19,8 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MqttMessageConsumer {
|
||||
@ -93,17 +98,21 @@ public class MqttMessageConsumer {
|
||||
String threadName = Thread.currentThread().getName();
|
||||
try {
|
||||
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
||||
|
||||
QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("device_imei", message);
|
||||
queryWrapper.eq("online_status", 1);
|
||||
Long count = deviceMapper.selectCount(queryWrapper);
|
||||
if(count == 0){
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
|
||||
if(StringUtils.isBlank(deviceOnlineStatusRedis)){
|
||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("device_imei", message)
|
||||
.set("online_status", 1);
|
||||
deviceMapper.update(updateWrapper);
|
||||
}
|
||||
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||
// queryWrapper.eq("device_imei", message);
|
||||
// queryWrapper.eq("online_status", 1);
|
||||
// Long count = deviceMapper.selectCount(queryWrapper);
|
||||
// if(count == 0){
|
||||
//
|
||||
// }
|
||||
// 模拟业务处理耗时
|
||||
// Thread.sleep(200);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user