package com.fuyuanshen.global.queue; import com.fuyuanshen.global.Provider.RedissonAlarmDelayProvider; import com.fuyuanshen.global.mqtt.service.AlarmCheckService; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j @RequiredArgsConstructor public class RedissonAlarmConsumer implements CommandLineRunner { private final RedissonClient redissonClient; private final AlarmCheckService alarmCheckService; private volatile boolean running = true; private Thread consumerThread; private ExecutorService bizExecutor; private static final int BIZ_THREADS = 4; // 业务处理线程数 private static final int BIZ_QUEUE_CAPACITY = 200; // 有界队列容量 @Override public void run(String... args) { // 初始化业务处理线程池(有界队列 + 调用者运行拒绝策略,避免 OOM) bizExecutor = Executors.newFixedThreadPool( BIZ_THREADS, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "Alarm-Biz-" + counter.getAndIncrement()); t.setDaemon(false); return t; } } ); // 启动消费线程 consumerThread = new Thread(() -> { RBlockingQueue blockingQueue = redissonClient.getBlockingQueue(RedissonAlarmDelayProvider.QUEUE_NAME); log.info("Redisson 延迟报警监听线程已启动..."); while (running && !Thread.currentThread().isInterrupted()) { try { Long alarmId = blockingQueue.poll(1, TimeUnit.SECONDS); // 改用带超时的 poll,可响应中断 if (alarmId != null) { // 提交到业务线程池异步处理,避免阻塞队列拉取 bizExecutor.submit(() -> processAlarm(alarmId)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.info("Redisson 消费线程被中断,退出循环"); break; } catch (Exception e) { log.error("Redisson 延迟队列消费异常", e); // 发生非中断异常时短暂休眠,避免日志风暴 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } log.info("Redisson 延迟报警消费线程结束"); }, "Alarm-Consumer-Thread"); consumerThread.setDaemon(false); consumerThread.start(); } private void processAlarm(Long alarmId) { try { alarmCheckService.executeCheck(alarmId); } catch (Exception e) { log.error("处理报警 ID [{}] 时发生异常", alarmId, e); // 可在此补充重试或死信逻辑 } } @PreDestroy public void destroy() { log.info("开始关闭 Redisson 报警消费者..."); running = false; if (consumerThread != null) { consumerThread.interrupt(); // 中断阻塞在 poll 上的线程 } if (bizExecutor != null) { bizExecutor.shutdown(); try { if (!bizExecutor.awaitTermination(60, TimeUnit.SECONDS)) { bizExecutor.shutdownNow(); } } catch (InterruptedException e) { bizExecutor.shutdownNow(); Thread.currentThread().interrupt(); } } log.info("Redisson 报警消费者已关闭"); } }