diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanBootLogoRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanBootLogoRule.java index 2b92071f..764e7921 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanBootLogoRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanBootLogoRule.java @@ -62,6 +62,14 @@ public class XinghanBootLogoRule implements MqttMessageRule { String respText = payload.getStaPicTrans(); log.warn("设备上报LOGO:{}", respText); + // --- 去重 START --- + String dedupKey = "xd:MSG:LOGO:" + ctx.getDeviceImei() + ":" + respText; + boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10)); + if (!first) { + log.warn("重复消息丢弃 {}", dedupKey); + return; + } + // 1. great! —— 成功标记 if ("great!".equalsIgnoreCase(respText)) { RedisUtils.setCacheObject(functionAccessKey, diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java index cbcf2eab..8739b3a7 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java @@ -28,12 +28,15 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import org.springframework.beans.factory.annotation.Autowired; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY; import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY; @@ -144,6 +147,10 @@ public class XinghanDeviceDataRule implements MqttMessageRule { Long alarmId = RedisUtils.getCacheObject(redisKey); + String lockKey = redisKey + ":lock"; // 分布式锁 key + RedissonClient client = RedisUtils.getClient(); // 唯一用到的“旧”入口 + RLock lock = client.getLock(lockKey); + // ---------- 情况 1:当前正在报警 ---------- if (nowAlarming) { // 已存在未结束报警 -> 什么都不做(同一条报警) @@ -152,10 +159,34 @@ public class XinghanDeviceDataRule implements MqttMessageRule { RedisUtils.setCacheObject(redisKey, alarmId, Duration.ofMinutes(10)); return; } - // 不存在 -> 新建 - DeviceAlarmBo bo = createAlarmBo(deviceImei, type); - deviceAlarmService.insertByBo(bo); - RedisUtils.setCacheObject(redisKey, bo.getId(), Duration.ofMinutes(10)); // 5分钟后结束过期 + // 需要新建,抢锁 + boolean locked = false; + try { + locked = lock.tryLock(3, TimeUnit.SECONDS); // 最多等 3 s + if (!locked) { // 抢不到直接放弃 + return; + } + // 锁内二次校验(double-check) + alarmId = RedisUtils.getCacheObject(redisKey); + if (alarmId != null) { + return; // 并发线程已建好 + } + + // 不存在 -> 新建 + DeviceAlarmBo bo = createAlarmBo(deviceImei, type); + if (bo == null){ + return; + } + deviceAlarmService.insertByBo(bo); + RedisUtils.setCacheObject(redisKey, bo.getId(), Duration.ofMinutes(10)); // 5分钟后结束过期 + }catch (InterruptedException ignore) { + // 立即中断并退出,禁止继续往下走 + Thread.currentThread().interrupt(); + } finally { + if (locked && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } return; } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java index 3a4b368e..851d52c2 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java @@ -55,6 +55,13 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule { String respText = payload.getStaBreakNews(); log.info("设备上报紧急通知握手: {} ", respText); + // --- 去重 START --- + String dedupKey = "xd:ALARM:dedup:" + ctx.getDeviceImei() + ":" + respText; + boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10)); + if (!first) { + log.warn("重复消息丢弃 {}", dedupKey); + return; + } // 1. cover! —— 成功标记 if ("cover!".equalsIgnoreCase(respText)) { diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendMsgRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendMsgRule.java index d68d18e3..800a5c41 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendMsgRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendMsgRule.java @@ -55,6 +55,14 @@ public class XinghanSendMsgRule implements MqttMessageRule { String respText = payload.getStaTexTrans(); log.info("设备上报人员信息: {} ", respText); + // --- 去重 START --- + String dedupKey = "xd:MSG:dedup:" + ctx.getDeviceImei() + ":" + respText; + boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10)); + if (!first) { + log.warn("重复消息丢弃 {}", dedupKey); + return; + } + // 1. genius! —— 成功标记 if ("genius!".equalsIgnoreCase(respText)) { RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/controller/device/DeviceXinghanController.java b/fys-admin/src/main/java/com/fuyuanshen/web/controller/device/DeviceXinghanController.java index 2f1c3240..53d54679 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/controller/device/DeviceXinghanController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/controller/device/DeviceXinghanController.java @@ -109,6 +109,17 @@ public class DeviceXinghanController extends BaseController { return R.ok(); } + /** + * SOS档位 批量 + * SOS档位,2,1,0, 分别表示红蓝模式/爆闪模式/关闭 + */ + @PostMapping("/SOSGradeSettingsBatch") + public R SOSGradeSettingsBatch(@RequestBody DeviceXinghanInstructDto params) { + // params 转 JSONObject + deviceXinghanBizService.sendCommandBatch(params,"ins_SOSGrade","SOS档位"); + return R.ok(); + } + /** * 静止报警状态 * 静止报警状态,0-未静止报警,1-正在静止报警。 diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/domain/Dto/DeviceXinghanInstructDto.java b/fys-admin/src/main/java/com/fuyuanshen/web/domain/Dto/DeviceXinghanInstructDto.java index 0d3b598f..3985ef93 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/domain/Dto/DeviceXinghanInstructDto.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/domain/Dto/DeviceXinghanInstructDto.java @@ -2,6 +2,8 @@ package com.fuyuanshen.web.domain.Dto; import lombok.Data; +import java.util.List; + @Data public class DeviceXinghanInstructDto { private Long deviceId; @@ -12,4 +14,5 @@ public class DeviceXinghanInstructDto { */ private String instructValue; private Boolean isBluetooth = false; + private List deviceIds; } diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceXinghanBizService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceXinghanBizService.java index 617458cc..6e53f77a 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceXinghanBizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceXinghanBizService.java @@ -28,6 +28,7 @@ import com.fuyuanshen.common.json.utils.JsonUtils; import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.common.satoken.utils.AppLoginHelper; import com.fuyuanshen.equipment.domain.Device; +import com.fuyuanshen.equipment.domain.DeviceLog; import com.fuyuanshen.equipment.domain.DeviceType; import com.fuyuanshen.equipment.domain.bo.DeviceAlarmBo; import com.fuyuanshen.equipment.domain.dto.AppDeviceSendMsgBo; @@ -531,6 +532,75 @@ public class DeviceXinghanBizService { createAlarm(device.getId(),deviceImei,payloadKey,value); } + /** + * 批量发送设备控制指令 + * + * @param dto 设备ID列表 + * @param payloadKey 指令负载数据的键名 + * @param deviceAction 设备操作类型描述 + */ + @Transactional(rollbackFor = Exception.class) // 1. 事务注解 + public void sendCommandBatch(DeviceXinghanInstructDto dto, String payloadKey, String deviceAction) { + List errorMessages = Collections.synchronizedList(new ArrayList<>()); + int value; + try { + value = Integer.parseInt(dto.getInstructValue()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("指令值格式不正确,必须为整数。", e); + } + Map> payload = Map.of(payloadKey, List.of(value)); + + // 一次性查询所有设备信息 + List devices = deviceMapper.selectList( + new QueryWrapper().lambda().in(Device::getId, dto.getDeviceIds()) + ); + // 日志信息 + String contentText = resolveGradeDesc(payloadKey, value); + + List logs = new ArrayList<>(); + try { + for (Device device : devices) { + String deviceImei = device.getDeviceImei(); + String deviceName = device.getDeviceName(); + + // 2. 提前进行设备状态检查,逻辑更清晰 + if (isDeviceOffline(deviceImei)) { + throw new ServiceException("设备已断开连接:" + deviceName); + } + + String topic = MqttConstants.GLOBAL_PUB_KEY + deviceImei; + String json = JsonUtils.toJsonString(payload); + + mqttGateway.sendMsgToMqtt(topic, 1, json); + log.info("发送指令成功 => topic:{}, payload:{}", topic, json); + + // 创建设备日志实体 + DeviceLog deviceLog = new DeviceLog(); + deviceLog.setDeviceId(device.getId()); + deviceLog.setDeviceAction(deviceAction); + deviceLog.setContent(contentText); + deviceLog.setCreateBy(AppLoginHelper.getUserId()); + deviceLog.setDeviceName(deviceName); + deviceLog.setCreateTime(new Date()); + logs.add(deviceLog); + + createAlarm(device.getId(), deviceImei, payloadKey, value); + } + deviceLogMapper.insertBatch(logs); + } catch (ServiceException e) { + // 捕获并重新抛出自定义异常,避免内层异常被外层泛化捕获 + log.error("批量发送指令失败: {}", e.getMessage(), e); + throw e; + } catch (Exception e) { + log.error("批量发送指令发生未知错误", e); + throw new ServiceException("批量发送指令失败"); + } + } + + /** + * 检查设备是否离线 + */ + // private boolean isDeviceOffline(String imei) { // // 原方法名语义相反,这里取反,使含义更清晰 // return getDeviceStatus(imei); diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/AppSmsAuthStrategy.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/AppSmsAuthStrategy.java index eb9ea8d3..862cfbd9 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/AppSmsAuthStrategy.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/AppSmsAuthStrategy.java @@ -54,7 +54,7 @@ public class AppSmsAuthStrategy implements IAuthStrategy { String phonenumber = loginBody.getPhonenumber(); String smsCode = loginBody.getSmsCode(); AppLoginUser loginUser = TenantHelper.dynamic(tenantId, () -> { -// loginService.checkLogin(LoginType.SMS, tenantId, phonenumber, () -> !validateSmsCode(tenantId, phonenumber, smsCode)); + loginService.checkLogin(LoginType.SMS, tenantId, phonenumber, () -> !validateSmsCode(tenantId, phonenumber, smsCode)); AppUserVo user = loadUserByPhonenumber(phonenumber); if (ObjectUtil.isNull(user)) { //新增Appuser