Files
fys-Multi-tenant/fys-admin/src/main/java/com/fuyuanshen/global/queue/MqttMessageConsumer.java

134 lines
5.6 KiB
Java
Raw Normal View History

2025-08-29 16:49:16 +08:00
package com.fuyuanshen.global.queue;
2025-09-13 15:51:34 +08:00
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
2025-08-29 16:49:16 +08:00
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
2025-09-13 16:46:05 +08:00
import com.fuyuanshen.common.core.constant.GlobalConstants;
import com.fuyuanshen.common.core.utils.StringUtils;
2025-08-29 16:49:16 +08:00
import com.fuyuanshen.common.redis.utils.RedisUtils;
import com.fuyuanshen.equipment.domain.Device;
import com.fuyuanshen.equipment.mapper.DeviceMapper;
2025-09-13 16:46:05 +08:00
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
2025-08-29 16:49:16 +08:00
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
2025-09-13 16:46:05 +08:00
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
2025-08-29 16:49:16 +08:00
@Service
@Slf4j
public class MqttMessageConsumer {
@Autowired
private DeviceMapper deviceMapper;
// 创建两个线程池:一个用于消息获取,一个用于业务处理
private ExecutorService messageConsumerPool = Executors.newFixedThreadPool(3);
private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10);
// 初始化方法,启动消息监听
2025-09-13 15:37:48 +08:00
@PostConstruct
2025-08-29 16:49:16 +08:00
public void start() {
log.info("启动MQTT消息消费者...");
// 启动消息获取线程
for (int i = 0; i < 3; i++) {
messageConsumerPool.submit(this::consumeMessages);
}
}
// 销毁方法,关闭线程池
@PreDestroy
public void stop() {
log.info("关闭MQTT消息消费者...");
shutdownExecutorService(messageConsumerPool);
shutdownExecutorService(messageProcessorPool);
}
private void shutdownExecutorService(ExecutorService executorService) {
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 消费者方法 - 专门负责从队列获取消息
public void consumeMessages() {
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
String threadName = Thread.currentThread().getName();
2025-09-13 16:51:56 +08:00
// log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey);
2025-08-29 16:49:16 +08:00
try {
while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) {
// 阻塞式获取队列中的消息
String message = RedisUtils.pollDeduplicated(
queueKey,
MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY,
1,
TimeUnit.SECONDS
);
if (message != null) {
2025-09-13 16:51:56 +08:00
// log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message);
2025-08-29 16:49:16 +08:00
// 将消息处理任务提交到处理线程池
messageProcessorPool.submit(() -> processMessage(message));
}
}
} catch (Exception e) {
log.error("线程 {} 消费消息时发生错误", threadName, e);
}
log.info("消息消费者线程 {} 停止监听队列", threadName);
}
// 处理具体业务逻辑的方法
private void processMessage(String message) {
String threadName = Thread.currentThread().getName();
try {
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
2025-09-22 11:02:26 +08:00
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
// String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
// if(StringUtils.isBlank(deviceOnlineStatusRedis)){
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
// updateWrapper.eq("device_imei", message)
// .set("online_status", 1);
// deviceMapper.update(updateWrapper);
// }
2025-10-07 15:50:16 +08:00
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
// queryWrapper.eq("device_imei", message);
// queryWrapper.eq("online_status", 1);
// Long count = deviceMapper.selectCount(queryWrapper);
// if(count == 0){
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
// updateWrapper.eq("device_imei", message)
// .eq("online_status", 0)
// .set("online_status", 1);
// deviceMapper.update(updateWrapper);
// }
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("device_imei", message)
.eq("online_status", 0)
.set("online_status", 1);
int update = deviceMapper.update(updateWrapper);
2025-08-29 16:49:16 +08:00
// 模拟业务处理耗时
2025-09-13 15:37:48 +08:00
// Thread.sleep(200);
2025-08-29 16:49:16 +08:00
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
} catch (Exception e) {
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e);
}
}
}