Compare commits
5 Commits
9fbb0aefcf
...
15b3348989
| Author | SHA1 | Date | |
|---|---|---|---|
| 15b3348989 | |||
| 4004aa1090 | |||
| 39c8375f08 | |||
| 64c81ac44e | |||
| 41eb3d9a33 |
@ -4,6 +4,7 @@ import cn.hutool.core.thread.ThreadUtil;
|
|||||||
import com.baomidou.lock.LockInfo;
|
import com.baomidou.lock.LockInfo;
|
||||||
import com.baomidou.lock.LockTemplate;
|
import com.baomidou.lock.LockTemplate;
|
||||||
import com.baomidou.lock.executor.RedissonLockExecutor;
|
import com.baomidou.lock.executor.RedissonLockExecutor;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
@ -48,14 +49,6 @@ public class RedisKeyExpirationListener implements MessageListener {
|
|||||||
handleFunctionAccessExpired(element);
|
handleFunctionAccessExpired(element);
|
||||||
}
|
}
|
||||||
if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)) {
|
if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)) {
|
||||||
// threadPoolTaskExecutor.execute(() -> {
|
|
||||||
// log.info("设备离线:{}", expiredKey);
|
|
||||||
// String element = expiredKey.substring(GlobalConstants.GLOBAL_REDIS_KEY.length() + DEVICE_KEY_PREFIX.length(), expiredKey.length() - DEVICE_ONLINE_STATUS_KEY_PREFIX.length());
|
|
||||||
// UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
|
|
||||||
// deviceUpdateWrapper.eq("device_imei", element);
|
|
||||||
// deviceUpdateWrapper.set("online_status", 0);
|
|
||||||
// deviceMapper.update(deviceUpdateWrapper);
|
|
||||||
// });
|
|
||||||
|
|
||||||
threadPoolTaskExecutor.execute(() -> {
|
threadPoolTaskExecutor.execute(() -> {
|
||||||
log.info("设备离线:{}", expiredKey);
|
log.info("设备离线:{}", expiredKey);
|
||||||
@ -69,10 +62,16 @@ public class RedisKeyExpirationListener implements MessageListener {
|
|||||||
|
|
||||||
if (lockInfo != null) {
|
if (lockInfo != null) {
|
||||||
try {
|
try {
|
||||||
UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
|
QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||||
deviceUpdateWrapper.eq("device_imei", element);
|
queryWrapper.eq("device_imei", element);
|
||||||
deviceUpdateWrapper.set("online_status", 0);
|
queryWrapper.eq("online_status", 1);
|
||||||
deviceMapper.update(deviceUpdateWrapper);
|
Long count = deviceMapper.selectCount(queryWrapper);
|
||||||
|
if(count > 0){
|
||||||
|
UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
|
||||||
|
deviceUpdateWrapper.eq("device_imei", element);
|
||||||
|
deviceUpdateWrapper.set("online_status", 0);
|
||||||
|
deviceMapper.update(deviceUpdateWrapper);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
//释放锁
|
//释放锁
|
||||||
lockTemplate.releaseLock(lockInfo);
|
lockTemplate.releaseLock(lockInfo);
|
||||||
|
|||||||
@ -49,12 +49,12 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
String[] subStr = receivedTopic.split("/");
|
String[] subStr = receivedTopic.split("/");
|
||||||
String deviceImei = subStr[1];
|
String deviceImei = subStr[1];
|
||||||
if(StringUtils.isNotBlank(deviceImei)){
|
if(StringUtils.isNotBlank(deviceImei)){
|
||||||
|
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||||
|
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||||
|
RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24));
|
||||||
//在线状态
|
//在线状态
|
||||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62));
|
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62));
|
||||||
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
|
||||||
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
|
||||||
// RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String state = payloadDict.getStr("state");
|
String state = payloadDict.getStr("state");
|
||||||
|
|||||||
@ -1,9 +1,13 @@
|
|||||||
package com.fuyuanshen.global.queue;
|
package com.fuyuanshen.global.queue;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||||
|
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||||
|
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
import com.fuyuanshen.equipment.domain.Device;
|
import com.fuyuanshen.equipment.domain.Device;
|
||||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||||
|
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -15,6 +19,8 @@ import java.util.concurrent.ExecutorService;
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MqttMessageConsumer {
|
public class MqttMessageConsumer {
|
||||||
@ -27,7 +33,7 @@ public class MqttMessageConsumer {
|
|||||||
private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10);
|
private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10);
|
||||||
|
|
||||||
// 初始化方法,启动消息监听
|
// 初始化方法,启动消息监听
|
||||||
// @PostConstruct
|
@PostConstruct
|
||||||
public void start() {
|
public void start() {
|
||||||
log.info("启动MQTT消息消费者...");
|
log.info("启动MQTT消息消费者...");
|
||||||
// 启动消息获取线程
|
// 启动消息获取线程
|
||||||
@ -62,7 +68,7 @@ public class MqttMessageConsumer {
|
|||||||
public void consumeMessages() {
|
public void consumeMessages() {
|
||||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||||
String threadName = Thread.currentThread().getName();
|
String threadName = Thread.currentThread().getName();
|
||||||
log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey);
|
// log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) {
|
while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) {
|
||||||
@ -75,7 +81,7 @@ public class MqttMessageConsumer {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message);
|
// log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message);
|
||||||
// 将消息处理任务提交到处理线程池
|
// 将消息处理任务提交到处理线程池
|
||||||
messageProcessorPool.submit(() -> processMessage(message));
|
messageProcessorPool.submit(() -> processMessage(message));
|
||||||
}
|
}
|
||||||
@ -92,15 +98,23 @@ public class MqttMessageConsumer {
|
|||||||
String threadName = Thread.currentThread().getName();
|
String threadName = Thread.currentThread().getName();
|
||||||
try {
|
try {
|
||||||
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
||||||
|
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||||
// 实现具体的业务逻辑
|
String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
|
||||||
// 例如更新数据库、发送通知等
|
if(StringUtils.isBlank(deviceOnlineStatusRedis)){
|
||||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
updateWrapper.eq("device_imei", message)
|
updateWrapper.eq("device_imei", message)
|
||||||
.set("online_status", 1);
|
.set("online_status", 1);
|
||||||
deviceMapper.update(updateWrapper);
|
deviceMapper.update(updateWrapper);
|
||||||
|
}
|
||||||
|
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||||
|
// queryWrapper.eq("device_imei", message);
|
||||||
|
// queryWrapper.eq("online_status", 1);
|
||||||
|
// Long count = deviceMapper.selectCount(queryWrapper);
|
||||||
|
// if(count == 0){
|
||||||
|
//
|
||||||
|
// }
|
||||||
// 模拟业务处理耗时
|
// 模拟业务处理耗时
|
||||||
Thread.sleep(200);
|
// Thread.sleep(200);
|
||||||
|
|
||||||
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
Reference in New Issue
Block a user