diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java b/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java index f48a1a0a..87c08404 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java @@ -29,10 +29,13 @@ public class AppVideoController extends BaseController { private final VideoProcessService videoProcessService; private final AudioProcessService audioProcessService; + /** + * 上传视频转码code默认1:RGB565 2:BGR565 + */ @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) @RepeatSubmit(interval = 2, timeUnit = TimeUnit.SECONDS,message = "请勿重复提交!") - public R> uploadVideo(@RequestParam("file") MultipartFile file) { - return R.ok(videoProcessService.processVideo(file)); + public R> uploadVideo(@RequestParam("file") MultipartFile file, @RequestParam(defaultValue = "1") int code) { + return R.ok(videoProcessService.processVideo(file, code)); } /** diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/service/VideoProcessService.java b/fys-admin/src/main/java/com/fuyuanshen/app/service/VideoProcessService.java index a141f52f..1fde958d 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/service/VideoProcessService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/service/VideoProcessService.java @@ -28,7 +28,7 @@ public class VideoProcessService { private final VideoProcessUtil videoProcessUtil; - public List processVideo(MultipartFile file) { + public List processVideo(MultipartFile file, int code) { // 1. 参数校验 validateVideoFile(file); @@ -39,9 +39,10 @@ public class VideoProcessService { // 3. 处理视频并提取帧数据 List hexList = videoProcessUtil.processVideoToHex( - tempFile, FRAME_RATE, DURATION, WIDTH, HEIGHT + tempFile, FRAME_RATE, DURATION, WIDTH, HEIGHT, code ); - + log.info("code: {} hexList(前100个): {}", code, + hexList.subList(0, Math.min(100, hexList.size()))); log.info("视频处理成功,生成Hex数据长度: {}", hexList.size()); return hexList; diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java index 5412ee59..5d929943 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanSendAlarmMessageRule.java @@ -1,9 +1,18 @@ package com.fuyuanshen.global.mqtt.rule.xinghan; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fuyuanshen.common.json.utils.JsonUtils; import com.fuyuanshen.common.redis.utils.RedisUtils; +import com.fuyuanshen.common.satoken.utils.LoginHelper; +import com.fuyuanshen.common.sse.dto.SseMessageDto; +import com.fuyuanshen.common.sse.utils.SseMessageUtils; +import com.fuyuanshen.equipment.domain.Device; +import com.fuyuanshen.equipment.domain.DeviceLog; +import com.fuyuanshen.equipment.mapper.DeviceLogMapper; +import com.fuyuanshen.equipment.mapper.DeviceMapper; import com.fuyuanshen.global.mqtt.base.MqttMessageRule; import com.fuyuanshen.global.mqtt.base.MqttRuleContext; import com.fuyuanshen.global.mqtt.config.MqttGateway; @@ -21,6 +30,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY; import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY; @@ -40,6 +51,18 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule { private final MqttGateway mqttGateway; private final ObjectMapper objectMapper; + private final ScheduledExecutorService scheduledExecutorService; + private final DeviceLogMapper deviceLogMapper; + private final DeviceMapper deviceMapper; + /** + * 设备上行确认消息 + */ + public static final String BREAK_NEWS_CONFIRMATION = "I get it"; + + /** + * 设备上行成功标记 + */ + public static final String BREAK_NEWS_SUCCESS = "cover!"; @Override public String getCommandType() { @@ -62,9 +85,36 @@ public class XinghanSendAlarmMessageRule implements MqttMessageRule { log.warn("重复消息丢弃 {}", dedupKey); return; } - + // 1. I get it —— 表示用户确认收到消息 + if (BREAK_NEWS_CONFIRMATION.equalsIgnoreCase(respText)) { + var device = deviceMapper.selectOne(new QueryWrapper().eq("device_imei", ctx.getDeviceImei())); + // 使用MyBatis-Plus内置方法查询最新一条紧急通知 + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("device_id", device.getId()) + .eq("device_action", "发送紧急通知") // 根据您的表结构调整 + .orderByDesc("create_time") + .last("LIMIT 1"); + DeviceLog latestLog = deviceLogMapper.selectOne(queryWrapper); + log.info("设备 {} 最新紧急通知:{}", ctx.getDeviceImei(), latestLog); + if (latestLog == null) { + return; + } + // 更新数据源字段 + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("id", latestLog.getId()) // 条件:ID匹配 + .set("data_source", "设备已收到通知"); // 要更新的字段 + deviceLogMapper.update(null, updateWrapper); + // 推送SSE消息 + scheduledExecutorService.schedule(() -> { + SseMessageDto dto = new SseMessageDto(); + dto.setMessage(String.format("%s设备已收到通知!", latestLog.getDeviceName())); + dto.setUserIds(List.of(latestLog.getCreateBy())); + SseMessageUtils.publishMessage(dto); + }, 5, TimeUnit.SECONDS); + return; + } // 1. cover! —— 成功标记 - if ("cover!".equalsIgnoreCase(respText)) { + if (BREAK_NEWS_SUCCESS.equalsIgnoreCase(respText)) { RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20)); log.info("设备 {} 发送紧急通知完成", ctx.getDeviceImei()); return; diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/util/VideoProcessUtil.java b/fys-admin/src/main/java/com/fuyuanshen/web/util/VideoProcessUtil.java index 06f96d5e..2381390e 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/util/VideoProcessUtil.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/util/VideoProcessUtil.java @@ -11,6 +11,8 @@ import javax.imageio.ImageIO; import java.awt.image.BufferedImage; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Files; import java.util.ArrayList; import java.util.List; @@ -37,15 +39,26 @@ public class VideoProcessUtil { /** * 处理视频并转换为Hex字符串列表 */ - public List processVideoToHex(File videoFile, int frameRate, int duration, int width, int height) throws Exception { + public List processVideoToHex(File videoFile, int frameRate, int duration, int width, int height, int code) throws Exception { // 1. 提取视频帧 List frames = extractFramesFromVideo(videoFile, frameRate, duration, width, height); - // 2. 转换为RGB565格式 - byte[] binaryData = convertFramesToRGB565(frames); + if (code == 1) { + // 1. 转换为RGB565格式 + byte[] binaryData = convertFramesToRGB565(frames); - // 3. 转换为Hex字符串列表 - return bytesToHexList(binaryData); + // 2. 转换为Hex字符串列表 + return bytesToHexList(binaryData); + } else { + // 1. 转换为BGR565格式 + byte[] binaryData = convertFramesToBGR565(frames); + + // 新增:直接生成 mp4 + //bgr565ToMp4(binaryData, width, height, frameRate, "output.mp4"); + + // 2. 转换为Hex字符串列表 + return bytesToHexList(binaryData); + } } /** @@ -110,6 +123,55 @@ public class VideoProcessUtil { return result; } + /** + * 将所有帧转换为 BGR565 格式字节数组 + */ + private byte[] convertFramesToBGR565(List frames) throws Exception { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + for (BufferedImage image : frames) { + byte[] bgr565Bytes = convertToBGR565(image); + byteArrayOutputStream.write(bgr565Bytes); + } + + byte[] result = byteArrayOutputStream.toByteArray(); + log.debug("转换BGR565数据完成,总字节数: {}", result.length); + return result; + } + + /** + * 将BufferedImage转换为真正的BGR565格式字节数组 + */ + private byte[] convertToBGR565(BufferedImage image) { + int width = image.getWidth(); + int height = image.getHeight(); + byte[] bgr565Data = new byte[width * height * 2]; + + int index = 0; + for (int y = 0; y < height; y++) { + for (int x = 0; x < width; x++) { + int rgb = image.getRGB(x, y); + + // 提取RGB分量 + int red = (rgb >> 16) & 0xFF; + int green = (rgb >> 8) & 0xFF; + int blue = rgb & 0xFF; + + int b = (blue >> 3) & 0x1F; // 5位蓝色 + int g = (green >> 2) & 0x3F; // 6位绿色 + int r = (red >> 3) & 0x1F; // 5位红色 + + // 正确的BGR565组合:红色在高位,蓝色在低位 + int bgr565 = (b << 11) | (g << 5) | r; + + bgr565Data[index++] = (byte) ((bgr565 >> 8) & 0xFF); + // 小端序存储 + bgr565Data[index++] = (byte) (bgr565 & 0xFF); + } + } + return bgr565Data; + } + /** * 将字节数组转换为Hex字符串列表 */ @@ -191,4 +253,76 @@ public class VideoProcessUtil { } } } + + /** + * 把 BGR565 字节流直接写成 MP4(H.264) + * @param bgr565 完整的 BGR565 裸帧流(每像素 2 字节) + * @param width 帧宽 + * @param height 帧高 + * @param fps 帧率 + * @param outMp4 输出 mp4 文件绝对路径 + * @throws IOException 进程启动 / IO 失败 + */ + public static void bgr565ToMp4(byte[] bgr565, + int width, + int height, + int fps, + String outMp4) throws IOException { + + int framePixels = width * height; + int frameBytes = framePixels * 2; + if (bgr565.length % frameBytes != 0) { + throw new IllegalArgumentException("字节数组长度不是整帧"); + } + + /* 1. 构造 FFmpeg 命令 */ + String[] cmd = { + "ffmpeg", + "-y", // 覆盖输出 + "-f", "rawvideo", + "-pixel_format", "bgr24", + "-video_size", width + "x" + height, + "-framerate", String.valueOf(fps), + "-i", "-", // 从 stdin 读 + "-c:v", "libx264", + "-pix_fmt", "yuv420p", + "-crf", "23", // 画质可自己调 + outMp4 + }; + + /* 2. 启动进程 */ + ProcessBuilder pb = new ProcessBuilder(cmd); + pb.redirectError(ProcessBuilder.Redirect.INHERIT); // 把 FFmpeg 日志打到控制台 + Process p = pb.start(); + try (OutputStream ffmpegIn = p.getOutputStream()) { + + /* 3. 逐帧转换并写入管道 */ + byte[] bgr24 = new byte[framePixels * 3]; + for (int off = 0; off < bgr565.length; off += frameBytes) { + for (int i = 0, j = 0; i < frameBytes; i += 2, j += 3) { + int u = ((bgr565[off + i + 1] & 0xFF) << 8) + | (bgr565[off + i] & 0xFF); + int b = (u & 0x1F) << 3; + int g = ((u >> 5) & 0x3F) << 2; + int r = ((u >> 11) & 0x1F) << 3; + bgr24[j] = (byte) b; + bgr24[j + 1] = (byte) g; + bgr24[j + 2] = (byte) r; + } + ffmpegIn.write(bgr24); + } + ffmpegIn.flush(); + } + + /* 4. 等待编码结束 */ + try { + int exit = p.waitFor(); + if (exit != 0) { + throw new IOException("FFmpeg 异常退出,code=" + exit); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("等待 FFmpeg 被中断", e); + } + } } \ No newline at end of file