2 Commits

Author SHA1 Message Date
aa69b552aa Merge remote-tracking branch 'liwenlong-fys/jingquan' into jingquan 2025-11-20 16:25:04 +08:00
3dd0d4cc90 feat(video): 支持BGR565格式视频处理及MQTT设备确认消息更新
- 新增BGR565格式转换逻辑,支持RGB565与BGR565两种颜色格式- 视频上传接口增加code参数,默认值为1(RGB565)
- 在VideoProcessUtil中实现convertFramesToBGR565方法
- 添加bgr565ToMp4工具方法用于将BGR565数据编码为MP4文件
- MQTT规则新增对“设备已收到通知”的处理逻辑
- 设备确认消息后更新数据库日志状态并推送SSE消息
- 引入ScheduledExecutorService延时推送SSE消息- 增加设备日志和设备Mapper依赖以支持数据操作
2025-11-20 16:24:45 +08:00

View File

@ -1,9 +1,18 @@
package com.fuyuanshen.global.mqtt.rule.xinghan; package com.fuyuanshen.global.mqtt.rule.xinghan;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fuyuanshen.common.json.utils.JsonUtils; import com.fuyuanshen.common.json.utils.JsonUtils;
import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.common.redis.utils.RedisUtils;
import com.fuyuanshen.common.satoken.utils.LoginHelper;
import com.fuyuanshen.common.sse.dto.SseMessageDto;
import com.fuyuanshen.common.sse.utils.SseMessageUtils;
import com.fuyuanshen.equipment.domain.Device;
import com.fuyuanshen.equipment.domain.DeviceLog;
import com.fuyuanshen.equipment.mapper.DeviceLogMapper;
import com.fuyuanshen.equipment.mapper.DeviceMapper;
import com.fuyuanshen.global.mqtt.base.MqttMessageRule; import com.fuyuanshen.global.mqtt.base.MqttMessageRule;
import com.fuyuanshen.global.mqtt.base.MqttRuleContext; import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
import com.fuyuanshen.global.mqtt.config.MqttGateway; import com.fuyuanshen.global.mqtt.config.MqttGateway;
@ -21,6 +30,8 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY; import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY; import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY;
@ -40,6 +51,18 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule {
private final MqttGateway mqttGateway; private final MqttGateway mqttGateway;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final ScheduledExecutorService scheduledExecutorService;
private final DeviceLogMapper deviceLogMapper;
private final DeviceMapper deviceMapper;
/**
* 设备上行确认消息
*/
public static final String BREAK_NEWS_CONFIRMATION = "I get it";
/**
* 设备上行成功标记
*/
public static final String BREAK_NEWS_SUCCESS = "cover!";
@Override @Override
public String getCommandType() { public String getCommandType() {
@ -62,9 +85,36 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule {
log.warn("重复消息丢弃 {}", dedupKey); log.warn("重复消息丢弃 {}", dedupKey);
return; return;
} }
// 1. I get it —— 表示用户确认收到消息
if (BREAK_NEWS_CONFIRMATION.equalsIgnoreCase(respText)) {
var device = deviceMapper.selectOne(new QueryWrapper<Device>().eq("device_imei", ctx.getDeviceImei()));
// 使用MyBatis-Plus内置方法查询最新一条紧急通知
QueryWrapper<DeviceLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("device_id", device.getId())
.eq("device_action", "发送紧急通知") // 根据您的表结构调整
.orderByDesc("create_time")
.last("LIMIT 1");
DeviceLog latestLog = deviceLogMapper.selectOne(queryWrapper);
log.info("设备 {} 最新紧急通知:{}", ctx.getDeviceImei(), latestLog);
if (latestLog == null) {
return;
}
// 更新数据源字段
UpdateWrapper<DeviceLog> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("id", latestLog.getId()) // 条件ID匹配
.set("data_source", "设备已收到通知"); // 要更新的字段
deviceLogMapper.update(null, updateWrapper);
// 推送SSE消息
scheduledExecutorService.schedule(() -> {
SseMessageDto dto = new SseMessageDto();
dto.setMessage(String.format("%s设备已收到通知", latestLog.getDeviceName()));
dto.setUserIds(List.of(latestLog.getCreateBy()));
SseMessageUtils.publishMessage(dto);
}, 5, TimeUnit.SECONDS);
return;
}
// 1. cover! —— 成功标记 // 1. cover! —— 成功标记
if ("cover!".equalsIgnoreCase(respText)) { if (BREAK_NEWS_SUCCESS.equalsIgnoreCase(respText)) {
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
log.info("设备 {} 发送紧急通知完成", ctx.getDeviceImei()); log.info("设备 {} 发送紧急通知完成", ctx.getDeviceImei());
return; return;