feat(device): 实现设备批量控制指令发送功能- 新增批量发送设备控制指令方法 sendCommandBatch- 支持设备离线状态检查和异常处理

- 添加设备操作日志记录和报警创建- 实现设备SOS档位批量设置接口
- 在设备指令处理中增加消息去重机制
- 优化设备报警处理的分布式锁逻辑
- 完善设备数据规则中的并发控制
This commit is contained in:
2025-09-28 16:11:34 +08:00
parent 2d59397de5
commit a4596b9c90
8 changed files with 143 additions and 5 deletions

View File

@ -62,6 +62,14 @@ public class XinghanBootLogoRule implements MqttMessageRule {
String respText = payload.getStaPicTrans();
log.warn("设备上报LOGO{}", respText);
// --- 去重 START ---
String dedupKey = "xd:MSG:LOGO:" + ctx.getDeviceImei() + ":" + respText;
boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10));
if (!first) {
log.warn("重复消息丢弃 {}", dedupKey);
return;
}
// 1. great! —— 成功标记
if ("great!".equalsIgnoreCase(respText)) {
RedisUtils.setCacheObject(functionAccessKey,

View File

@ -28,12 +28,15 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY;
@ -144,6 +147,10 @@ public class XinghanDeviceDataRule implements MqttMessageRule {
Long alarmId = RedisUtils.getCacheObject(redisKey);
String lockKey = redisKey + ":lock"; // 分布式锁 key
RedissonClient client = RedisUtils.getClient(); // 唯一用到的“旧”入口
RLock lock = client.getLock(lockKey);
// ---------- 情况 1当前正在报警 ----------
if (nowAlarming) {
// 已存在未结束报警 -> 什么都不做(同一条报警)
@ -152,10 +159,34 @@ public class XinghanDeviceDataRule implements MqttMessageRule {
RedisUtils.setCacheObject(redisKey, alarmId, Duration.ofMinutes(10));
return;
}
// 不存在 -> 新建
DeviceAlarmBo bo = createAlarmBo(deviceImei, type);
deviceAlarmService.insertByBo(bo);
RedisUtils.setCacheObject(redisKey, bo.getId(), Duration.ofMinutes(10)); // 5分钟后结束过期
// 需要新建,抢锁
boolean locked = false;
try {
locked = lock.tryLock(3, TimeUnit.SECONDS); // 最多等 3 s
if (!locked) { // 抢不到直接放弃
return;
}
// 锁内二次校验double-check
alarmId = RedisUtils.getCacheObject(redisKey);
if (alarmId != null) {
return; // 并发线程已建好
}
// 不存在 -> 新建
DeviceAlarmBo bo = createAlarmBo(deviceImei, type);
if (bo == null){
return;
}
deviceAlarmService.insertByBo(bo);
RedisUtils.setCacheObject(redisKey, bo.getId(), Duration.ofMinutes(10)); // 5分钟后结束过期
}catch (InterruptedException ignore) {
// 立即中断并退出,禁止继续往下走
Thread.currentThread().interrupt();
} finally {
if (locked && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return;
}

View File

@ -55,6 +55,13 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule {
String respText = payload.getStaBreakNews();
log.info("设备上报紧急通知握手: {} ", respText);
// --- 去重 START ---
String dedupKey = "xd:ALARM:dedup:" + ctx.getDeviceImei() + ":" + respText;
boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10));
if (!first) {
log.warn("重复消息丢弃 {}", dedupKey);
return;
}
// 1. cover! —— 成功标记
if ("cover!".equalsIgnoreCase(respText)) {

View File

@ -55,6 +55,14 @@ public class XinghanSendMsgRule implements MqttMessageRule {
String respText = payload.getStaTexTrans();
log.info("设备上报人员信息: {} ", respText);
// --- 去重 START ---
String dedupKey = "xd:MSG:dedup:" + ctx.getDeviceImei() + ":" + respText;
boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10));
if (!first) {
log.warn("重复消息丢弃 {}", dedupKey);
return;
}
// 1. genius! —— 成功标记
if ("genius!".equalsIgnoreCase(respText)) {
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));