2025-08-29 16:49:16 +08:00
|
|
|
package com.fuyuanshen.global.queue;
|
|
|
|
|
|
2025-09-13 15:51:34 +08:00
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
2025-08-29 16:49:16 +08:00
|
|
|
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
|
|
|
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
|
|
|
|
import com.fuyuanshen.equipment.domain.Device;
|
|
|
|
|
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
|
|
|
|
import jakarta.annotation.PostConstruct;
|
|
|
|
|
import jakarta.annotation.PreDestroy;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
@Service
|
|
|
|
|
@Slf4j
|
|
|
|
|
public class MqttMessageConsumer {
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private DeviceMapper deviceMapper;
|
|
|
|
|
|
|
|
|
|
// 创建两个线程池:一个用于消息获取,一个用于业务处理
|
|
|
|
|
private ExecutorService messageConsumerPool = Executors.newFixedThreadPool(3);
|
|
|
|
|
private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10);
|
|
|
|
|
|
|
|
|
|
// 初始化方法,启动消息监听
|
2025-09-13 15:37:48 +08:00
|
|
|
@PostConstruct
|
2025-08-29 16:49:16 +08:00
|
|
|
public void start() {
|
|
|
|
|
log.info("启动MQTT消息消费者...");
|
|
|
|
|
// 启动消息获取线程
|
|
|
|
|
for (int i = 0; i < 3; i++) {
|
|
|
|
|
messageConsumerPool.submit(this::consumeMessages);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 销毁方法,关闭线程池
|
|
|
|
|
@PreDestroy
|
|
|
|
|
public void stop() {
|
|
|
|
|
log.info("关闭MQTT消息消费者...");
|
|
|
|
|
shutdownExecutorService(messageConsumerPool);
|
|
|
|
|
shutdownExecutorService(messageProcessorPool);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void shutdownExecutorService(ExecutorService executorService) {
|
|
|
|
|
if (executorService != null && !executorService.isShutdown()) {
|
|
|
|
|
executorService.shutdown();
|
|
|
|
|
try {
|
|
|
|
|
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
|
|
executorService.shutdownNow();
|
|
|
|
|
}
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
executorService.shutdownNow();
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 消费者方法 - 专门负责从队列获取消息
|
|
|
|
|
public void consumeMessages() {
|
|
|
|
|
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
|
|
|
|
String threadName = Thread.currentThread().getName();
|
|
|
|
|
log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) {
|
|
|
|
|
// 阻塞式获取队列中的消息
|
|
|
|
|
String message = RedisUtils.pollDeduplicated(
|
|
|
|
|
queueKey,
|
|
|
|
|
MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY,
|
|
|
|
|
1,
|
|
|
|
|
TimeUnit.SECONDS
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (message != null) {
|
|
|
|
|
log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message);
|
|
|
|
|
// 将消息处理任务提交到处理线程池
|
|
|
|
|
messageProcessorPool.submit(() -> processMessage(message));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("线程 {} 消费消息时发生错误", threadName, e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.info("消息消费者线程 {} 停止监听队列", threadName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 处理具体业务逻辑的方法
|
|
|
|
|
private void processMessage(String message) {
|
|
|
|
|
String threadName = Thread.currentThread().getName();
|
|
|
|
|
try {
|
|
|
|
|
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
2025-09-13 15:51:34 +08:00
|
|
|
|
|
|
|
|
QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
|
|
|
|
queryWrapper.eq("device_imei", message);
|
|
|
|
|
queryWrapper.eq("online_status", 1);
|
|
|
|
|
Long count = deviceMapper.selectCount(queryWrapper);
|
|
|
|
|
if(count == 0){
|
|
|
|
|
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
|
|
|
|
updateWrapper.eq("device_imei", message)
|
|
|
|
|
.set("online_status", 1);
|
|
|
|
|
deviceMapper.update(updateWrapper);
|
|
|
|
|
}
|
2025-08-29 16:49:16 +08:00
|
|
|
// 模拟业务处理耗时
|
2025-09-13 15:37:48 +08:00
|
|
|
// Thread.sleep(200);
|
2025-08-29 16:49:16 +08:00
|
|
|
|
|
|
|
|
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|