package com.fuyuanshen.global.queue; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.fuyuanshen.common.core.constant.GlobalConstants; import com.fuyuanshen.common.core.utils.StringUtils; import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.equipment.domain.Device; import com.fuyuanshen.equipment.mapper.DeviceMapper; import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; @Service @Slf4j public class MqttMessageConsumer { @Autowired private DeviceMapper deviceMapper; // 创建两个线程池:一个用于消息获取,一个用于业务处理 private ExecutorService messageConsumerPool = Executors.newFixedThreadPool(3); private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10); // 初始化方法,启动消息监听 @PostConstruct public void start() { log.info("启动MQTT消息消费者..."); // 启动消息获取线程 for (int i = 0; i < 3; i++) { messageConsumerPool.submit(this::consumeMessages); } } // 销毁方法,关闭线程池 @PreDestroy public void stop() { log.info("关闭MQTT消息消费者..."); shutdownExecutorService(messageConsumerPool); shutdownExecutorService(messageProcessorPool); } private void shutdownExecutorService(ExecutorService executorService) { if (executorService != null && !executorService.isShutdown()) { executorService.shutdown(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } } // 消费者方法 - 专门负责从队列获取消息 public void consumeMessages() { String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY; String threadName = Thread.currentThread().getName(); // log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey); try { while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) { // 阻塞式获取队列中的消息 String message = RedisUtils.pollDeduplicated( queueKey, MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY, 1, TimeUnit.SECONDS ); if (message != null) { // log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message); // 将消息处理任务提交到处理线程池 messageProcessorPool.submit(() -> processMessage(message)); } } } catch (Exception e) { log.error("线程 {} 消费消息时发生错误", threadName, e); } log.info("消息消费者线程 {} 停止监听队列", threadName); } // 处理具体业务逻辑的方法 private void processMessage(String message) { String threadName = Thread.currentThread().getName(); try { log.info("业务处理线程 {} 开始处理消息: {}", threadName, message); // String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ; // String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey); // if(StringUtils.isBlank(deviceOnlineStatusRedis)){ // UpdateWrapper updateWrapper = new UpdateWrapper<>(); // updateWrapper.eq("device_imei", message) // .set("online_status", 1); // deviceMapper.update(updateWrapper); // } QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("device_imei", message); queryWrapper.eq("online_status", 1); Long count = deviceMapper.selectCount(queryWrapper); if(count == 0){ UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("device_imei", message) .set("online_status", 1); deviceMapper.update(updateWrapper); } // 模拟业务处理耗时 // Thread.sleep(200); log.info("业务处理线程 {} 完成消息处理: {}", threadName, message); } catch (Exception e) { log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e); } } }