forked from dyf/fys-Multi-tenant
Merge pull request 'jingquan' (#13) from liwenlong/fys-Multi-tenant:jingquan into main
Reviewed-on: dyf/fys-Multi-tenant#13
This commit is contained in:
@ -62,6 +62,14 @@ public class XinghanBootLogoRule implements MqttMessageRule {
|
|||||||
String respText = payload.getStaPicTrans();
|
String respText = payload.getStaPicTrans();
|
||||||
log.warn("设备上报LOGO:{}", respText);
|
log.warn("设备上报LOGO:{}", respText);
|
||||||
|
|
||||||
|
// --- 去重 START ---
|
||||||
|
String dedupKey = "xd:MSG:LOGO:" + ctx.getDeviceImei() + ":" + respText;
|
||||||
|
boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10));
|
||||||
|
if (!first) {
|
||||||
|
log.warn("重复消息丢弃 {}", dedupKey);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 1. great! —— 成功标记
|
// 1. great! —— 成功标记
|
||||||
if ("great!".equalsIgnoreCase(respText)) {
|
if ("great!".equalsIgnoreCase(respText)) {
|
||||||
RedisUtils.setCacheObject(functionAccessKey,
|
RedisUtils.setCacheObject(functionAccessKey,
|
||||||
|
|||||||
@ -28,12 +28,15 @@ import lombok.AllArgsConstructor;
|
|||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
|
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
|
||||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY;
|
import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY;
|
||||||
@ -144,6 +147,10 @@ public class XinghanDeviceDataRule implements MqttMessageRule {
|
|||||||
|
|
||||||
Long alarmId = RedisUtils.getCacheObject(redisKey);
|
Long alarmId = RedisUtils.getCacheObject(redisKey);
|
||||||
|
|
||||||
|
String lockKey = redisKey + ":lock"; // 分布式锁 key
|
||||||
|
RedissonClient client = RedisUtils.getClient(); // 唯一用到的“旧”入口
|
||||||
|
RLock lock = client.getLock(lockKey);
|
||||||
|
|
||||||
// ---------- 情况 1:当前正在报警 ----------
|
// ---------- 情况 1:当前正在报警 ----------
|
||||||
if (nowAlarming) {
|
if (nowAlarming) {
|
||||||
// 已存在未结束报警 -> 什么都不做(同一条报警)
|
// 已存在未结束报警 -> 什么都不做(同一条报警)
|
||||||
@ -152,10 +159,34 @@ public class XinghanDeviceDataRule implements MqttMessageRule {
|
|||||||
RedisUtils.setCacheObject(redisKey, alarmId, Duration.ofMinutes(10));
|
RedisUtils.setCacheObject(redisKey, alarmId, Duration.ofMinutes(10));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// 需要新建,抢锁
|
||||||
|
boolean locked = false;
|
||||||
|
try {
|
||||||
|
locked = lock.tryLock(3, TimeUnit.SECONDS); // 最多等 3 s
|
||||||
|
if (!locked) { // 抢不到直接放弃
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 锁内二次校验(double-check)
|
||||||
|
alarmId = RedisUtils.getCacheObject(redisKey);
|
||||||
|
if (alarmId != null) {
|
||||||
|
return; // 并发线程已建好
|
||||||
|
}
|
||||||
|
|
||||||
// 不存在 -> 新建
|
// 不存在 -> 新建
|
||||||
DeviceAlarmBo bo = createAlarmBo(deviceImei, type);
|
DeviceAlarmBo bo = createAlarmBo(deviceImei, type);
|
||||||
|
if (bo == null){
|
||||||
|
return;
|
||||||
|
}
|
||||||
deviceAlarmService.insertByBo(bo);
|
deviceAlarmService.insertByBo(bo);
|
||||||
RedisUtils.setCacheObject(redisKey, bo.getId(), Duration.ofMinutes(10)); // 5分钟后结束过期
|
RedisUtils.setCacheObject(redisKey, bo.getId(), Duration.ofMinutes(10)); // 5分钟后结束过期
|
||||||
|
}catch (InterruptedException ignore) {
|
||||||
|
// 立即中断并退出,禁止继续往下走
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} finally {
|
||||||
|
if (locked && lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -55,6 +55,13 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule {
|
|||||||
|
|
||||||
String respText = payload.getStaBreakNews();
|
String respText = payload.getStaBreakNews();
|
||||||
log.info("设备上报紧急通知握手: {} ", respText);
|
log.info("设备上报紧急通知握手: {} ", respText);
|
||||||
|
// --- 去重 START ---
|
||||||
|
String dedupKey = "xd:ALARM:dedup:" + ctx.getDeviceImei() + ":" + respText;
|
||||||
|
boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10));
|
||||||
|
if (!first) {
|
||||||
|
log.warn("重复消息丢弃 {}", dedupKey);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 1. cover! —— 成功标记
|
// 1. cover! —— 成功标记
|
||||||
if ("cover!".equalsIgnoreCase(respText)) {
|
if ("cover!".equalsIgnoreCase(respText)) {
|
||||||
|
|||||||
@ -55,6 +55,14 @@ public class XinghanSendMsgRule implements MqttMessageRule {
|
|||||||
String respText = payload.getStaTexTrans();
|
String respText = payload.getStaTexTrans();
|
||||||
log.info("设备上报人员信息: {} ", respText);
|
log.info("设备上报人员信息: {} ", respText);
|
||||||
|
|
||||||
|
// --- 去重 START ---
|
||||||
|
String dedupKey = "xd:MSG:dedup:" + ctx.getDeviceImei() + ":" + respText;
|
||||||
|
boolean first = RedisUtils.setObjectIfAbsent(dedupKey, "1", Duration.ofSeconds(10));
|
||||||
|
if (!first) {
|
||||||
|
log.warn("重复消息丢弃 {}", dedupKey);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 1. genius! —— 成功标记
|
// 1. genius! —— 成功标记
|
||||||
if ("genius!".equalsIgnoreCase(respText)) {
|
if ("genius!".equalsIgnoreCase(respText)) {
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
|
|||||||
@ -109,6 +109,17 @@ public class DeviceXinghanController extends BaseController {
|
|||||||
return R.ok();
|
return R.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SOS档位 批量
|
||||||
|
* SOS档位,2,1,0, 分别表示红蓝模式/爆闪模式/关闭
|
||||||
|
*/
|
||||||
|
@PostMapping("/SOSGradeSettingsBatch")
|
||||||
|
public R<Void> SOSGradeSettingsBatch(@RequestBody DeviceXinghanInstructDto params) {
|
||||||
|
// params 转 JSONObject
|
||||||
|
deviceXinghanBizService.sendCommandBatch(params,"ins_SOSGrade","SOS档位");
|
||||||
|
return R.ok();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 静止报警状态
|
* 静止报警状态
|
||||||
* 静止报警状态,0-未静止报警,1-正在静止报警。
|
* 静止报警状态,0-未静止报警,1-正在静止报警。
|
||||||
|
|||||||
@ -2,6 +2,8 @@ package com.fuyuanshen.web.domain.Dto;
|
|||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class DeviceXinghanInstructDto {
|
public class DeviceXinghanInstructDto {
|
||||||
private Long deviceId;
|
private Long deviceId;
|
||||||
@ -12,4 +14,5 @@ public class DeviceXinghanInstructDto {
|
|||||||
*/
|
*/
|
||||||
private String instructValue;
|
private String instructValue;
|
||||||
private Boolean isBluetooth = false;
|
private Boolean isBluetooth = false;
|
||||||
|
private List<Long> deviceIds;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -28,6 +28,7 @@ import com.fuyuanshen.common.json.utils.JsonUtils;
|
|||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
import com.fuyuanshen.common.satoken.utils.AppLoginHelper;
|
import com.fuyuanshen.common.satoken.utils.AppLoginHelper;
|
||||||
import com.fuyuanshen.equipment.domain.Device;
|
import com.fuyuanshen.equipment.domain.Device;
|
||||||
|
import com.fuyuanshen.equipment.domain.DeviceLog;
|
||||||
import com.fuyuanshen.equipment.domain.DeviceType;
|
import com.fuyuanshen.equipment.domain.DeviceType;
|
||||||
import com.fuyuanshen.equipment.domain.bo.DeviceAlarmBo;
|
import com.fuyuanshen.equipment.domain.bo.DeviceAlarmBo;
|
||||||
import com.fuyuanshen.equipment.domain.dto.AppDeviceSendMsgBo;
|
import com.fuyuanshen.equipment.domain.dto.AppDeviceSendMsgBo;
|
||||||
@ -531,6 +532,75 @@ public class DeviceXinghanBizService {
|
|||||||
createAlarm(device.getId(),deviceImei,payloadKey,value);
|
createAlarm(device.getId(),deviceImei,payloadKey,value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量发送设备控制指令
|
||||||
|
*
|
||||||
|
* @param dto 设备ID列表
|
||||||
|
* @param payloadKey 指令负载数据的键名
|
||||||
|
* @param deviceAction 设备操作类型描述
|
||||||
|
*/
|
||||||
|
@Transactional(rollbackFor = Exception.class) // 1. 事务注解
|
||||||
|
public void sendCommandBatch(DeviceXinghanInstructDto dto, String payloadKey, String deviceAction) {
|
||||||
|
List<String> errorMessages = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
int value;
|
||||||
|
try {
|
||||||
|
value = Integer.parseInt(dto.getInstructValue());
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw new IllegalArgumentException("指令值格式不正确,必须为整数。", e);
|
||||||
|
}
|
||||||
|
Map<String, List<Integer>> payload = Map.of(payloadKey, List.of(value));
|
||||||
|
|
||||||
|
// 一次性查询所有设备信息
|
||||||
|
List<Device> devices = deviceMapper.selectList(
|
||||||
|
new QueryWrapper<Device>().lambda().in(Device::getId, dto.getDeviceIds())
|
||||||
|
);
|
||||||
|
// 日志信息
|
||||||
|
String contentText = resolveGradeDesc(payloadKey, value);
|
||||||
|
|
||||||
|
List<DeviceLog> logs = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
for (Device device : devices) {
|
||||||
|
String deviceImei = device.getDeviceImei();
|
||||||
|
String deviceName = device.getDeviceName();
|
||||||
|
|
||||||
|
// 2. 提前进行设备状态检查,逻辑更清晰
|
||||||
|
if (isDeviceOffline(deviceImei)) {
|
||||||
|
throw new ServiceException("设备已断开连接:" + deviceName);
|
||||||
|
}
|
||||||
|
|
||||||
|
String topic = MqttConstants.GLOBAL_PUB_KEY + deviceImei;
|
||||||
|
String json = JsonUtils.toJsonString(payload);
|
||||||
|
|
||||||
|
mqttGateway.sendMsgToMqtt(topic, 1, json);
|
||||||
|
log.info("发送指令成功 => topic:{}, payload:{}", topic, json);
|
||||||
|
|
||||||
|
// 创建设备日志实体
|
||||||
|
DeviceLog deviceLog = new DeviceLog();
|
||||||
|
deviceLog.setDeviceId(device.getId());
|
||||||
|
deviceLog.setDeviceAction(deviceAction);
|
||||||
|
deviceLog.setContent(contentText);
|
||||||
|
deviceLog.setCreateBy(AppLoginHelper.getUserId());
|
||||||
|
deviceLog.setDeviceName(deviceName);
|
||||||
|
deviceLog.setCreateTime(new Date());
|
||||||
|
logs.add(deviceLog);
|
||||||
|
|
||||||
|
createAlarm(device.getId(), deviceImei, payloadKey, value);
|
||||||
|
}
|
||||||
|
deviceLogMapper.insertBatch(logs);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
// 捕获并重新抛出自定义异常,避免内层异常被外层泛化捕获
|
||||||
|
log.error("批量发送指令失败: {}", e.getMessage(), e);
|
||||||
|
throw e;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("批量发送指令发生未知错误", e);
|
||||||
|
throw new ServiceException("批量发送指令失败");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 检查设备是否离线
|
||||||
|
*/
|
||||||
|
|
||||||
// private boolean isDeviceOffline(String imei) {
|
// private boolean isDeviceOffline(String imei) {
|
||||||
// // 原方法名语义相反,这里取反,使含义更清晰
|
// // 原方法名语义相反,这里取反,使含义更清晰
|
||||||
// return getDeviceStatus(imei);
|
// return getDeviceStatus(imei);
|
||||||
|
|||||||
@ -54,7 +54,7 @@ public class AppSmsAuthStrategy implements IAuthStrategy {
|
|||||||
String phonenumber = loginBody.getPhonenumber();
|
String phonenumber = loginBody.getPhonenumber();
|
||||||
String smsCode = loginBody.getSmsCode();
|
String smsCode = loginBody.getSmsCode();
|
||||||
AppLoginUser loginUser = TenantHelper.dynamic(tenantId, () -> {
|
AppLoginUser loginUser = TenantHelper.dynamic(tenantId, () -> {
|
||||||
// loginService.checkLogin(LoginType.SMS, tenantId, phonenumber, () -> !validateSmsCode(tenantId, phonenumber, smsCode));
|
loginService.checkLogin(LoginType.SMS, tenantId, phonenumber, () -> !validateSmsCode(tenantId, phonenumber, smsCode));
|
||||||
AppUserVo user = loadUserByPhonenumber(phonenumber);
|
AppUserVo user = loadUserByPhonenumber(phonenumber);
|
||||||
if (ObjectUtil.isNull(user)) {
|
if (ObjectUtil.isNull(user)) {
|
||||||
//新增Appuser
|
//新增Appuser
|
||||||
|
|||||||
@ -33,4 +33,7 @@ public class DeviceLocationVo {
|
|||||||
@Schema(description = "进入的电子围栏信息")
|
@Schema(description = "进入的电子围栏信息")
|
||||||
private DeviceGeoFence fenceInfo;
|
private DeviceGeoFence fenceInfo;
|
||||||
|
|
||||||
|
@Schema(description = "设备是否正在告警")
|
||||||
|
private Boolean isAlarming;
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -774,6 +774,8 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> impleme
|
|||||||
|
|
||||||
// 注入电子围栏服务
|
// 注入电子围栏服务
|
||||||
IDeviceGeoFenceService geoFenceService = SpringUtils.getBean(IDeviceGeoFenceService.class);
|
IDeviceGeoFenceService geoFenceService = SpringUtils.getBean(IDeviceGeoFenceService.class);
|
||||||
|
// 注入设备告警Mapper
|
||||||
|
DeviceAlarmMapper deviceAlarmMapper = SpringUtils.getBean(DeviceAlarmMapper.class);
|
||||||
|
|
||||||
for (Device device : devices) {
|
for (Device device : devices) {
|
||||||
DeviceLocationVo vo = new DeviceLocationVo();
|
DeviceLocationVo vo = new DeviceLocationVo();
|
||||||
@ -808,6 +810,23 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> impleme
|
|||||||
vo.setInFence(false);
|
vo.setInFence(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 检查设备是否正在告警(只要当前设备处于报警状态,且不是电子围栏告警)
|
||||||
|
if (StringUtils.isNotBlank(device.getDeviceImei())) {
|
||||||
|
DeviceAlarmVo latestAlarm = deviceAlarmMapper.selectLatestByDeviceImei(device.getDeviceImei());
|
||||||
|
// 判断是否正在告警:未处理的告警(treatmentState=1)且不是电子围栏告警(deviceAction!=3)
|
||||||
|
if (latestAlarm != null &&
|
||||||
|
latestAlarm.getTreatmentState() != null &&
|
||||||
|
latestAlarm.getTreatmentState() == 1 &&
|
||||||
|
latestAlarm.getDeviceAction() != null &&
|
||||||
|
latestAlarm.getDeviceAction() != 3) {
|
||||||
|
vo.setIsAlarming(true);
|
||||||
|
} else {
|
||||||
|
vo.setIsAlarming(false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
vo.setIsAlarming(false);
|
||||||
|
}
|
||||||
|
|
||||||
result.add(vo);
|
result.add(vo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user