forked from dyf/fys-Multi-tenant
Compare commits
2 Commits
359cabbd2c
...
aa69b552aa
| Author | SHA1 | Date | |
|---|---|---|---|
| aa69b552aa | |||
| 3dd0d4cc90 |
@ -1,9 +1,18 @@
|
||||
package com.fuyuanshen.global.mqtt.rule.xinghan;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.common.satoken.utils.LoginHelper;
|
||||
import com.fuyuanshen.common.sse.dto.SseMessageDto;
|
||||
import com.fuyuanshen.common.sse.utils.SseMessageUtils;
|
||||
import com.fuyuanshen.equipment.domain.Device;
|
||||
import com.fuyuanshen.equipment.domain.DeviceLog;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceLogMapper;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttMessageRule;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.config.MqttGateway;
|
||||
@ -21,6 +30,8 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
|
||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY;
|
||||
@ -40,6 +51,18 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule {
|
||||
|
||||
private final MqttGateway mqttGateway;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
private final DeviceLogMapper deviceLogMapper;
|
||||
private final DeviceMapper deviceMapper;
|
||||
/**
|
||||
* 设备上行确认消息
|
||||
*/
|
||||
public static final String BREAK_NEWS_CONFIRMATION = "I get it";
|
||||
|
||||
/**
|
||||
* 设备上行成功标记
|
||||
*/
|
||||
public static final String BREAK_NEWS_SUCCESS = "cover!";
|
||||
|
||||
@Override
|
||||
public String getCommandType() {
|
||||
@ -62,9 +85,36 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule {
|
||||
log.warn("重复消息丢弃 {}", dedupKey);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. I get it —— 表示用户确认收到消息
|
||||
if (BREAK_NEWS_CONFIRMATION.equalsIgnoreCase(respText)) {
|
||||
var device = deviceMapper.selectOne(new QueryWrapper<Device>().eq("device_imei", ctx.getDeviceImei()));
|
||||
// 使用MyBatis-Plus内置方法查询最新一条紧急通知
|
||||
QueryWrapper<DeviceLog> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.eq("device_id", device.getId())
|
||||
.eq("device_action", "发送紧急通知") // 根据您的表结构调整
|
||||
.orderByDesc("create_time")
|
||||
.last("LIMIT 1");
|
||||
DeviceLog latestLog = deviceLogMapper.selectOne(queryWrapper);
|
||||
log.info("设备 {} 最新紧急通知:{}", ctx.getDeviceImei(), latestLog);
|
||||
if (latestLog == null) {
|
||||
return;
|
||||
}
|
||||
// 更新数据源字段
|
||||
UpdateWrapper<DeviceLog> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("id", latestLog.getId()) // 条件:ID匹配
|
||||
.set("data_source", "设备已收到通知"); // 要更新的字段
|
||||
deviceLogMapper.update(null, updateWrapper);
|
||||
// 推送SSE消息
|
||||
scheduledExecutorService.schedule(() -> {
|
||||
SseMessageDto dto = new SseMessageDto();
|
||||
dto.setMessage(String.format("%s设备已收到通知!", latestLog.getDeviceName()));
|
||||
dto.setUserIds(List.of(latestLog.getCreateBy()));
|
||||
SseMessageUtils.publishMessage(dto);
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
return;
|
||||
}
|
||||
// 1. cover! —— 成功标记
|
||||
if ("cover!".equalsIgnoreCase(respText)) {
|
||||
if (BREAK_NEWS_SUCCESS.equalsIgnoreCase(respText)) {
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
log.info("设备 {} 发送紧急通知完成", ctx.getDeviceImei());
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user