From f25afe0e9da0ba11194a3489680384e078698250 Mon Sep 17 00:00:00 2001 From: DragonWenLong <552045633@qq.com> Date: Fri, 7 Nov 2025 16:59:07 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(file):=20=E6=96=B0=E5=A2=9E=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=93=88=E5=B8=8C=E5=8E=BB=E9=87=8D=E4=B8=8E=E6=96=87?= =?UTF-8?q?=E6=9C=AC=E6=8F=90=E5=8F=96=E5=8A=9F=E8=83=BD-=20=E5=9C=A8?= =?UTF-8?q?=E5=A4=9A=E4=B8=AA=E6=A8=A1=E5=9D=97=E4=B8=AD=E5=BC=95=E5=85=A5?= =?UTF-8?q?=20FileHashUtil=20=E5=B9=B6=E7=94=A8=E4=BA=8E=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=89=8D=E7=9A=84=E5=93=88=E5=B8=8C=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=20-=20=E4=BC=98=E5=8C=96=E6=96=87=E4=BB=B6=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0=E9=80=BB=E8=BE=91=EF=BC=8C=E5=AE=9E=E7=8E=B0=E5=9F=BA?= =?UTF-8?q?=E4=BA=8E=E5=93=88=E5=B8=8C=E7=9A=84=E7=A7=92=E4=BC=A0=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=20-=20=E6=96=B0=E5=A2=9E=E9=9F=B3=E9=A2=91=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E4=B8=AD=E7=9A=84=E6=96=87=E6=9C=AC=E6=8F=90=E5=8F=96?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=8C=E6=94=AF=E6=8C=81=20txt=20=E5=92=8C?= =?UTF-8?q?=20docx=20=E6=A0=BC=E5=BC=8F=20-=20=E4=BD=BF=E7=94=A8=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E8=A7=A3=E6=9E=90=E6=8A=80=E6=9C=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=A4=A7=E6=96=87=E4=BB=B6=E5=86=85=E5=AE=B9=EF=BC=8C=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E5=86=85=E5=AD=98=E6=BA=A2=E5=87=BA=20-=E4=B8=BA=20Ap?= =?UTF-8?q?pVideoController=20=E6=B7=BB=E5=8A=A0=20/extract=20=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=94=A8=E4=BA=8E=E6=96=87=E6=9C=AC=E5=86=85=E5=AE=B9?= =?UTF-8?q?=E6=8F=90=E5=8F=96=20-=20=E5=AE=8C=E5=96=84=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=93=88=E5=B8=8C=E5=B7=A5=E5=85=B7=E7=B1=BB=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E7=BA=BF=E7=A8=8B=E5=AE=89=E5=85=A8=E6=80=A7=E4=B8=8E?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86=20-=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=20SysOssService=20=E7=9A=84=20updateHash=20=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E4=BB=A5=E6=94=AF=E6=8C=81=E5=A4=8D=E7=94=A8=E9=80=BB=E8=BE=91?= =?UTF-8?q?-=20=E7=BB=9F=E4=B8=80=E6=9E=84=E5=BB=BA=20SysOssVo=20=E5=AE=9E?= =?UTF-8?q?=E4=BD=93=E6=97=B6=E7=9A=84=E5=93=88=E5=B8=8C=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/controller/AppVideoController.java | 10 +++ .../app/service/AudioProcessService.java | 80 ++++++++++++++++++- .../service/device/DeviceDebugService.java | 25 ++---- .../impl/DeviceRepairRecordsServiceImpl.java | 11 +-- .../service/impl/DeviceServiceImpl.java | 9 ++- .../equipment/utils/FileHashUtil.java | 48 ++++++++++- .../system/service/ISysOssService.java | 8 ++ .../service/impl/SysOssServiceImpl.java | 35 +++++++- 8 files changed, 187 insertions(+), 39 deletions(-) diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java b/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java index 11bc7540..f48a1a0a 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppVideoController.java @@ -1,5 +1,6 @@ package com.fuyuanshen.app.controller; +import cn.dev33.satoken.annotation.SaIgnore; import com.fuyuanshen.app.service.AudioProcessService; import com.fuyuanshen.app.service.VideoProcessService; import com.fuyuanshen.common.core.domain.R; @@ -51,4 +52,13 @@ public class AppVideoController extends BaseController { public R> uploadAudioTTS(@RequestParam String text) throws IOException { return R.ok(audioProcessService.generateStandardPcmData(text)); } + + /** + * 提取文本内容(只支持txt/docx) + */ + @PostMapping(value = "/extract", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @RepeatSubmit(interval = 2, timeUnit = TimeUnit.SECONDS,message = "请勿重复提交!") + public R extract(@RequestParam("file") MultipartFile file) throws Exception { + return R.ok("Success",audioProcessService.extract(file)); + } } diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/service/AudioProcessService.java b/fys-admin/src/main/java/com/fuyuanshen/app/service/AudioProcessService.java index a36e8f11..0415b191 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/service/AudioProcessService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/service/AudioProcessService.java @@ -7,11 +7,17 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; -import java.io.File; -import java.io.IOException; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import java.io.*; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Arrays; import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; /** * 音频处理服务 @@ -170,5 +176,75 @@ public class AudioProcessService { } } + /** + * 提取文本 + */ + public String extract(MultipartFile file) throws Exception { + String name = file.getOriginalFilename(); + if (name == null || + (!name.endsWith(".txt") && !name.endsWith(".docx"))) { + throw new IllegalArgumentException("仅支持 .txt 或 .docx"); + } + if (file.getSize() > MAX_AUDIO_SIZE) { + throw new IllegalArgumentException("文件超过5MB"); + } + + String text; + /* 全程流式,不落地磁盘,不一次性读字节数组 */ + try (InputStream in = file.getInputStream()) { + if (name.endsWith(".txt")) { + text = readTxt(in); + } else { + text = readDocx(in); + } + } + return text; + } + + /* ---------- txt:按行读,StringBuilder 复用 ---------- */ + private String readTxt(InputStream in) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + StringBuilder sb = new StringBuilder(4096); + String line; + while ((line = br.readLine()) != null) { + sb.append(line).append('\n'); + } + return sb.toString(); + } + + /* ---------- docx:ZipInputStream 只扫 document.xml ---------- */ + private String readDocx(InputStream in) throws IOException { + ZipInputStream zin = new ZipInputStream(in); + ZipEntry e; + while ((e = zin.getNextEntry()) != null) { + if ("word/document.xml".equals(e.getName())) { + return staxExtract(zin); // 流式读 XML + } + } + return ""; + } + + /* ---------- StAX 流式提取 ---------- */ + private String staxExtract(InputStream xml) throws IOException { + XMLStreamReader r = null; + StringBuilder sb = new StringBuilder(4096); + try { + //System.out.println(new String(xml.readAllBytes())); + r = XMLInputFactory.newInstance().createXMLStreamReader(xml); + while (r.hasNext()) { + if (r.next() == XMLStreamConstants.START_ELEMENT && + "t".equals(r.getLocalName())) { + String elementText = r.getElementText(); + sb.append(elementText); + } + } + } catch (XMLStreamException ex) { + throw new IOException(ex); + } finally { + if (r != null) try { r.close(); } catch (XMLStreamException ignore) {} + } + return sb.toString(); + } + } \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceDebugService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceDebugService.java index aa8fa6b7..afea4858 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceDebugService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceDebugService.java @@ -39,6 +39,7 @@ public class DeviceDebugService { private final IAppBusinessFileService appBusinessFileService; private final IAppOperationVideoService appOperationVideoService; private final DeviceService deviceService; + private final FileHashUtil fileHashUtil; /** * 文件上传并添加文件信息哈希去重 @@ -62,26 +63,12 @@ public class DeviceDebugService { Map hash2OssId = new LinkedHashMap<>(files.length); for (MultipartFile file : files) { // 1. 计算文件哈希 - String hash = FileHashUtil.hash(file); + String hash = fileHashUtil.hash(file); - // 2. 先根据 hash 查库(秒传) - SysOssVo exist = sysOssService.selectByHash(hash); - Long ossId; - if (exist != null) { - // 2.1 已存在,直接复用 - ossId = exist.getOssId(); - hash2OssId.putIfAbsent(hash, ossId); - } else { - // 2.2 不存在,真正上传 - SysOssVo upload = sysOssService.upload(file); - if (upload == null) { - return false; - } - ossId = upload.getOssId(); - hash2OssId.putIfAbsent(hash, ossId); - // 2.3 把 hash 写回记录(供下次去重) - sysOssService.updateHashById(ossId, hash); - } + SysOssVo exist = sysOssService.updateHash(file, hash); + // 2.1 已存在,直接复用 + long ossId = exist.getOssId(); + hash2OssId.putIfAbsent(hash, ossId); } // 4. 组装业务中间表 List bizList = new ArrayList<>(bo.getDeviceIds().length * hash2OssId.size()); diff --git a/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/service/impl/DeviceRepairRecordsServiceImpl.java b/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/service/impl/DeviceRepairRecordsServiceImpl.java index 80ff0fe5..ecc7007e 100644 --- a/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/service/impl/DeviceRepairRecordsServiceImpl.java +++ b/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/service/impl/DeviceRepairRecordsServiceImpl.java @@ -50,6 +50,7 @@ public class DeviceRepairRecordsServiceImpl extends ServiceImpl impleme private final DeviceTypeGrantsMapper deviceTypeGrantsMapper; private final DeviceFenceAccessRecordMapper deviceFenceAccessRecordMapper; + private final FileHashUtil fileHashUtil; /** @@ -209,7 +211,8 @@ public class DeviceServiceImpl extends ServiceImpl impleme // 保存图片并获取URL if (deviceForm.getFile() != null) { - SysOssVo upload = ossService.upload(deviceForm.getFile()); + String fileHash = fileHashUtil.hash(deviceForm.getFile()); + SysOssVo upload = ossService.updateHash(deviceForm.getFile(),fileHash); // 设置图片路径 deviceForm.setDevicePic(upload.getUrl()); } @@ -283,8 +286,8 @@ public class DeviceServiceImpl extends ServiceImpl impleme // 处理上传的图片 if (deviceForm.getFile() != null) { - // 设置图片路径 - SysOssVo oss = ossService.upload(deviceForm.getFile()); + String fileHash = fileHashUtil.hash(deviceForm.getFile()); + SysOssVo oss = ossService.updateHash(deviceForm.getFile(),fileHash); // 强制将HTTP替换为HTTPS if (oss.getUrl() != null && oss.getUrl().startsWith("http://")) { oss.setUrl(oss.getUrl().replaceFirst("^http://", "https://")); diff --git a/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/utils/FileHashUtil.java b/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/utils/FileHashUtil.java index 77011b8d..1e6666f2 100644 --- a/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/utils/FileHashUtil.java +++ b/fys-modules/fys-equipment/src/main/java/com/fuyuanshen/equipment/utils/FileHashUtil.java @@ -1,28 +1,70 @@ package com.fuyuanshen.equipment.utils; import org.apache.commons.codec.digest.DigestUtils; +import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.HexFormat; /** * 文件哈希工具类 */ +@Component // 如果使用 Spring 可以注入 public class FileHashUtil { + /* 算法常量 */ private static final String ALGORITHM = "SHA-256"; + /* 缓冲区大小 8 KB */ + private static final int BUFFER_SIZE = 8192; - public static String hash(MultipartFile file) throws IOException { - MessageDigest digest = DigestUtils.getDigest(ALGORITHM); + /** + * 计算上传文件的 SHA-256 十六进制哈希 + * + * @param file 上传文件;不能为 null,且必须非空 + * @return 64 位小写十六进制字符串 + * @throws IllegalArgumentException 参数不合法 + * @throws IOException 流读取失败 + * @throws IllegalStateException 算法运行时异常(不会触发) + */ + public String hash(MultipartFile file) throws IOException { + validate(file); + + /* 每个请求新建实例,保证线程安全 */ + MessageDigest digest = newDigest(); + + /* try-with-resources 自动关闭流 */ try (InputStream in = file.getInputStream()) { - byte[] buf = new byte[8192]; + byte[] buf = new byte[BUFFER_SIZE]; int len; while ((len = in.read(buf)) != -1) { digest.update(buf, 0, len); } } + + /* JDK 17+ 的 HexFormat,比 Apache Commons 更快且无需额外依赖 */ return HexFormat.of().formatHex(digest.digest()); } + + /* -------------------- 私有辅助方法 -------------------- */ + + private static void validate(MultipartFile file) { + if (file == null) { + throw new IllegalArgumentException("MultipartFile 不能为 null"); + } + if (file.isEmpty()) { + throw new IllegalArgumentException("上传文件不能为空"); + } + } + + private static MessageDigest newDigest() { + try { + return MessageDigest.getInstance(ALGORITHM); + } catch (NoSuchAlgorithmException e) { + /* SHA-256 是 JDK 必现算法,走到这里说明 JDK 实现损坏 */ + throw new IllegalStateException("算法 " + ALGORITHM + " 不可用", e); + } + } } diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/ISysOssService.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/ISysOssService.java index 32a0a3e0..8adfcda8 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/ISysOssService.java +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/ISysOssService.java @@ -60,6 +60,14 @@ public interface ISysOssService { */ int updateHashById(long ossId,String fileHash); + /** + * 更新文件 hash 值 + * + * @param file 文件对象 + * @return 匹配的 SysOssVo 列表 + */ + SysOssVo updateHash(MultipartFile file, String hash); + /** * 上传 MultipartFile 到对象存储服务,并保存文件信息到数据库 * diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/impl/SysOssServiceImpl.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/impl/SysOssServiceImpl.java index 80f1e6bf..bcadabbd 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/impl/SysOssServiceImpl.java +++ b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/service/impl/SysOssServiceImpl.java @@ -191,6 +191,32 @@ public class SysOssServiceImpl implements ISysOssService, OssService { storage.download(sysOss.getFileName(), response.getOutputStream(), response::setContentLengthLong); } + /** + * 上传 MultipartFile 到对象存储服务,并保存文件信息到数据库 + * + * @param file 要上传的 MultipartFile 对象 + * @return 保存到数据库的 SysOssVo 对象 + */ + @Override + public SysOssVo updateHash(MultipartFile file, String hash) { + // 2. 先根据 hash 查库(秒传) + SysOssVo exist = baseMapper.selectByHash(hash); + if (exist != null) { + return exist; + } + String originalfileName = file.getOriginalFilename(); + String suffix = StringUtils.substring(originalfileName, originalfileName.lastIndexOf("."), originalfileName.length()); + OssClient storage = OssFactory.instance(); + UploadResult uploadResult; + try { + uploadResult = storage.uploadSuffix(file.getBytes(), suffix, file.getContentType()); + } catch (IOException e) { + throw new ServiceException(e.getMessage()); + } + // 保存文件信息 + return buildResultEntity(originalfileName, suffix, storage.getConfigKey(), uploadResult,hash); + } + /** * 上传 MultipartFile 到对象存储服务,并保存文件信息到数据库 * @@ -210,7 +236,7 @@ public class SysOssServiceImpl implements ISysOssService, OssService { throw new ServiceException(e.getMessage()); } // 保存文件信息 - return buildResultEntity(originalfileName, suffix, storage.getConfigKey(), uploadResult); + return buildResultEntity(originalfileName, suffix, storage.getConfigKey(), uploadResult,null); } /** @@ -226,7 +252,7 @@ public class SysOssServiceImpl implements ISysOssService, OssService { OssClient storage = OssFactory.instance(); UploadResult uploadResult = storage.uploadSuffix(file, suffix); // 保存文件信息 - return buildResultEntity(originalfileName, suffix, storage.getConfigKey(), uploadResult); + return buildResultEntity(originalfileName, suffix, storage.getConfigKey(), uploadResult,null); } @@ -255,18 +281,19 @@ public class SysOssServiceImpl implements ISysOssService, OssService { uploadResult = storage.uploadSuffix(data, suffix, "image/jpeg"); // 假设是图片类型,可以根据实际需要修改 // 保存文件信息 - return buildResultEntity(fileName, suffix, storage.getConfigKey(), uploadResult); + return buildResultEntity(fileName, suffix, storage.getConfigKey(), uploadResult,null); } @NotNull - private SysOssVo buildResultEntity(String originalfileName, String suffix, String configKey, UploadResult uploadResult) { + private SysOssVo buildResultEntity(String originalfileName, String suffix, String configKey, UploadResult uploadResult, String hash) { SysOss oss = new SysOss(); oss.setUrl(uploadResult.getUrl()); oss.setFileSuffix(suffix); oss.setFileName(uploadResult.getFilename()); oss.setOriginalName(originalfileName); oss.setService(configKey); + oss.setFileHash(hash); // 设置哈希值 baseMapper.insert(oss); SysOssVo sysOssVo = MapstructUtils.convert(oss, SysOssVo.class); return this.matchingUrl(sysOssVo); From 33d61081726ac77dc792ec23c5118057b0ce1deb Mon Sep 17 00:00:00 2001 From: daiyongfei <974332738@qq.com> Date: Sat, 8 Nov 2025 09:38:19 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A4=84=E7=90=86MQTT=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/mqtt/base/MqttMessage.java | 46 +++++ .../global/mqtt/base/MqttTopicInfo.java | 29 +++ .../global/mqtt/base/SensorData.java | 24 +++ .../mqtt/handler/IotMqttMessageHandler.java | 136 +++++++++++++ .../mqtt/handler/MqttMessageHandler.java | 71 +++++++ .../global/mqtt/service/IotMqttService.java | 96 +++++++++ .../mqtt/service/MqttMessageService.java | 37 ++++ .../mqtt/service/impl/IotMqttServiceImpl.java | 109 +++++++++++ .../service/impl/MqttMessageServiceImpl.java | 163 ++++++++++++++++ .../global/mqtt/utils/MqttTopicUtils.java | 71 +++++++ .../service/device/DeviceBJQBizService.java | 2 + .../com/fuyuanshen/app/domain/vo/Main.java | 52 +++++ nginx.conf | 184 ++++++++++++++++++ 13 files changed, 1020 insertions(+) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java create mode 100644 fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java create mode 100644 nginx.conf diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java new file mode 100644 index 00000000..dd633b3c --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +import java.util.List; + +/** + * MQTT消息基础模型 + */ +@Data +public class MqttMessage { + /** + * 请求ID,用于匹配请求和响应 + */ + private String requestId; + + /** + * 设备IMEI + */ + private String imei; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; + + /** + * 功能类型 + */ + private String funcType; + + /** + * 数据内容 + */ + private Object data; + + /** + * 状态(响应时使用) + */ + private String status; + + /** + * 批量数据(设备上报时使用) + */ + private List batch; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java new file mode 100644 index 00000000..80d1dcf4 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java @@ -0,0 +1,29 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * MQTT主题信息模型 + */ +@Data +public class MqttTopicInfo { + /** + * 操作类型 (command/status/report) + */ + private String operation; + + /** + * 租户编码 + */ + private String tenantCode; + + /** + * 设备类型 + */ + private String deviceType; + + /** + * 设备IMEI + */ + private String imei; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java new file mode 100644 index 00000000..489d5f8f --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * 传感器数据模型 + */ +@Data +public class SensorData { + /** + * 传感器名称 + */ + private String sensor; + + /** + * 传感器值 + */ + private Object value; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java new file mode 100644 index 00000000..b77ac210 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java @@ -0,0 +1,136 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +/** + * IoT设备MQTT消息处理器 + * 用于处理设备上报的数据和响应消息 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class IotMqttMessageHandler { + + private final IotMqttService iotMqttService; + + /** + * 处理MQTT消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + JSONObject message = JSON.parseObject(payload); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + iotMqttService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + handleDeviceReport(topicInfo, message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + + } + + /** + * 解析MQTT主题 + * + * @param topic 主题字符串 + * @return 主题信息对象 + */ + MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } + + /** + * 处理设备上报数据 + * + * @param topicInfo 主题信息 + * @param message 消息内容 + */ + private void handleDeviceReport(MqttTopicInfo topicInfo, JSONObject message) { + // 获取时间戳 + Long timestamp = message.getLong("timestamp"); + + // 处理批量数据上报 + if (message.containsKey("batch")) { + JSONObject batchData = message.getJSONObject("batch"); + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + batchData, + timestamp); + } + // 处理单个数据上报 + else if (message.containsKey("sensor") && message.containsKey("value")) { + String sensor = message.getString("sensor"); + Object value = message.get("value"); + iotMqttService.handleSingleReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + sensor, + value, + timestamp); + } + // 处理其他格式的数据 + else { + // 将整个消息作为批量数据处理 + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message, + timestamp); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java new file mode 100644 index 00000000..1d661683 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * MQTT消息处理器 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MqttMessageHandler { + + private final MqttMessageService mqttMessageService; + + + /** + * 处理MQTT消息 + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = MqttTopicUtils.parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + MqttMessage message = JSON.parseObject(payload, MqttMessage.class); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + mqttMessageService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + mqttMessageService.handleDeviceReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + } + +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java new file mode 100644 index 00000000..a48caab9 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java @@ -0,0 +1,96 @@ +package com.fuyuanshen.global.mqtt.service; + +import com.alibaba.fastjson2.JSONObject; + +/** + * 通用IoT设备MQTT协议服务接口 + * 遵循统一的MQTT通信协议规范 + */ +public interface IotMqttService { + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 指令主题 + */ + String buildCommandTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 响应主题 + */ + String buildStatusTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 上报主题 + */ + String buildReportTopic(String tenantCode, String deviceType, String imei); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 (JSON格式) + */ + void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送响应消息到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送设备上报数据的确认消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 确认消息 (JSON格式) + */ + void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 处理设备上报的单个传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param sensor 传感器名称 + * @param value 传感器值 + * @param timestamp 时间戳 + */ + void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp); + + /** + * 处理设备上报的批量传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param batchData 批量数据 + * @param timestamp 时间戳 + */ + void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp); + + /** + * 处理设备对指令的响应 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java new file mode 100644 index 00000000..2a587223 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java @@ -0,0 +1,37 @@ +package com.fuyuanshen.global.mqtt.service; + + +import com.fuyuanshen.global.mqtt.base.MqttMessage; + +/** + * MQTT消息处理服务接口 + */ +public interface MqttMessageService { + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java new file mode 100644 index 00000000..afe38bd1 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java @@ -0,0 +1,109 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +import java.lang.reflect.Method; + +/** + * 通用IoT设备MQTT协议服务实现类 + * 遵循统一的MQTT通信协议规范 + */ +@Slf4j +@Service +public class IotMqttServiceImpl implements IotMqttService { + + @Autowired + private ApplicationContext applicationContext; + + // MQTT主题前缀 + private static final String COMMAND_PREFIX = "command"; + private static final String STATUS_PREFIX = "status"; + private static final String REPORT_PREFIX = "report"; + + @Override + public String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildCommandTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildStatusTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送响应消息到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildReportTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送设备上报数据确认消息: topic={}, payload={}", topic, payload); + } + + @Override + public void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp) { + log.info("处理设备上报的单个传感器数据: tenantCode={}, deviceType={}, imei={}, sensor={}, value={}, timestamp={}", + tenantCode, deviceType, imei, sensor, value, timestamp); + + // TODO: 实现具体的业务逻辑,如更新设备状态、存储传感器数据等 + } + + @Override + public void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp) { + log.info("处理设备上报的批量传感器数据: tenantCode={}, deviceType={}, imei={}, batchData={}, timestamp={}", + tenantCode, deviceType, imei, JSON.toJSONString(batchData), timestamp); + + // TODO: 实现具体的业务逻辑,如批量更新设备状态、存储传感器数据等 + } + + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message) { + log.info("处理设备对指令的响应: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // TODO: 实现具体的业务逻辑,如更新指令执行状态等 + } + + /** + * 通过反射方式发送MQTT消息 + * @param topic 主题 + * @param qos 服务质量等级 + * @param payload 消息内容 + */ + private void sendMqttMessage(String topic, int qos, String payload) { + try { + Object mqttGateway = applicationContext.getBean("mqttGateway"); + Method sendMethod = mqttGateway.getClass().getMethod("sendMsgToMqtt", String.class, int.class, String.class); + sendMethod.invoke(mqttGateway, topic, qos, payload); + } catch (Exception e) { + log.error("发送MQTT消息失败: topic={}, payload={}", topic, payload, e); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java new file mode 100644 index 00000000..1dc13cc9 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java @@ -0,0 +1,163 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.equipment.domain.Device; +import com.fuyuanshen.equipment.mapper.DeviceMapper; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.base.SensorData; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * MQTT消息处理服务实现类 + */ +@Slf4j +@Service +public class MqttMessageServiceImpl implements MqttMessageService { + + private final MqttGateway mqttGateway; + + private final DeviceMapper deviceMapper; + + public MqttMessageServiceImpl(MqttGateway mqttGateway, DeviceMapper deviceMapper) { + this.mqttGateway = mqttGateway; + this.deviceMapper = deviceMapper; + } + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备响应消息: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 根据requestId更新指令执行状态 + // TODO: 实现具体的业务逻辑,比如更新指令执行结果等 + } + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + @Override + public void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备上报数据: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 查找设备 + Device device = deviceMapper.selectDeviceByImei(imei); + if (device == null) { + log.warn("未找到对应设备: imei={}", imei); + return; + } + + // 处理批量数据上报 + if (message.getBatch() != null && !message.getBatch().isEmpty()) { + for (int i = 0; i < message.getBatch().size(); i++) { + processSensorData(device, message.getBatch().get(i)); + } + } + // 处理单个数据上报 + else if (message.getData() != null) { + // 如果data是一个SensorData对象,则处理它 + // 这里可以根据实际的数据结构做相应处理 + } + } + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message) { + // 构建下发指令主题 + String topic = MqttTopicUtils.buildCommandTopic(tenantCode, deviceType, imei); + + // 设置时间戳 + if (message.getTimestamp() == null) { + message.setTimestamp(System.currentTimeMillis()); + } + + // 发送消息到MQTT + String payload = JSON.toJSONString(message); + mqttGateway.sendMsgToMqtt(topic, 1, payload); + + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + /** + * 处理传感器数据 + * @param device 设备对象 + * @param sensorData 传感器数据 + */ + private void processSensorData(Device device, SensorData sensorData) { + log.info("处理传感器数据: deviceId={}, sensor={}, value={}", + device.getId(), sensorData.getSensor(), sensorData.getValue()); + + String sensor = sensorData.getSensor(); + Object value = sensorData.getValue(); + + // 根据不同的传感器类型处理数据 + switch (sensor) { + case "mainLightMode": + // 处理主灯模式数据 + updateDeviceMainLightMode(device, value); + break; + case "mainLightBrightness": + // 处理主灯亮度数据 + updateDeviceMainLightBrightness(device, value); + break; + case "batteryPercent": + // 处理电池电量数据 + updateDeviceBatteryPercent(device, value); + break; + default: + log.warn("未知的传感器类型: sensor={}", sensor); + break; + } + } + + /** + * 更新设备主灯模式 + * @param device 设备对象 + * @param value 主灯模式值 + */ + private void updateDeviceMainLightMode(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯模式: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备主灯亮度 + * @param device 设备对象 + * @param value 主灯亮度值 + */ + private void updateDeviceMainLightBrightness(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯亮度: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备电池电量 + * @param device 设备对象 + * @param value 电池电量值 + */ + private void updateDeviceBatteryPercent(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备电池电量: deviceId={}, value={}", device.getId(), value); + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java new file mode 100644 index 00000000..774a29cb --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.utils; + +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.experimental.UtilityClass; + +/** + * MQTT主题处理工具类 + */ +@UtilityClass +public class MqttTopicUtils { + + public static final String COMMAND_PREFIX = "command"; + public static final String STATUS_PREFIX = "status"; + public static final String REPORT_PREFIX = "report"; + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 解析MQTT主题 + * @param topic 主题字符串 + * @return 主题信息对象 + */ + public static MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java index 03d2c988..031618ba 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java @@ -59,6 +59,7 @@ public class DeviceBJQBizService { private final MqttGateway mqttGateway; private final DeviceLogMapper deviceLogMapper; + public int sendMessage(AppDeviceSendMsgBo bo) { List deviceIds = bo.getDeviceIds(); if (deviceIds == null || deviceIds.isEmpty()) { @@ -573,4 +574,5 @@ public class DeviceBJQBizService { uploadDeviceLogo(dto); } } + } diff --git a/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java new file mode 100644 index 00000000..81706ddb --- /dev/null +++ b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java @@ -0,0 +1,52 @@ +package com.fuyuanshen.app.domain.vo; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.TimeZone; + +public class Main { + public static void main(String[] args) throws IOException { + + String[] availableIDs = TimeZone.getAvailableIDs(); + for (String id : availableIDs) { + System.out.println(id); + } + + byte[] data = "hello, world!".getBytes(StandardCharsets.UTF_8); + try (CountInputStream input = new CountInputStream(new ByteArrayInputStream(data))) { + int n; + while ((n = input.read()) != -1) { + System.out.println((char)n); + } + System.out.println("Total read " + input.getBytesRead() + " bytes"); + } + } +} + +class CountInputStream extends FilterInputStream { + private int count = 0; + + CountInputStream(InputStream in) { + super(in); + } + + public int getBytesRead() { + return this.count; + } + + public int read() throws IOException { + int n = in.read(); + if (n != -1) { + this.count ++; + } + return n; + } + + public int read(byte[] b, int off, int len) throws IOException { + int n = in.read(b, off, len); + if (n != -1) { + this.count += n; + } + return n; + } +} \ No newline at end of file diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 00000000..b4f68c98 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,184 @@ +#user nobody; +worker_processes 1; + +#error_log logs/error.log; +#error_log logs/error.log notice; +#error_log logs/error.log info; + +#pid logs/nginx.pid; + +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + # '$status $body_bytes_sent "$http_referer" ' + # '"$http_user_agent" "$http_x_forwarded_for"'; + + #access_log logs/access.log main; + + sendfile on; + #tcp_nopush on; + + #keepalive_timeout 0; + keepalive_timeout 65; + + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + upstream websocket { + server 127.0.0.1:9083; + } + + #gzip on; + + server { + listen 80; + server_name cnxhyc.com; + + #charset koi8-r; + + #access_log logs/host.access.log main; + + location / { + root html; + index index.html index.htm; + try_files $uri $uri/ /index.html; + } + + #error_page 404 /404.html; + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root html; + } + + # proxy the PHP scripts to Apache listening on 127.0.0.1:80 + # + #location ~ \.php$ { + # proxy_pass http://127.0.0.1; + #} + + # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000 + # + #location ~ \.php$ { + # root html; + # fastcgi_pass 127.0.0.1:9000; + # fastcgi_index index.php; + # fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name; + # include fastcgi_params; + #} + + # deny access to .htaccess files, if Apache's document root + # concurs with nginx's one + # + #location ~ /\.ht { + # deny all; + #} + + location /fys/ { + proxy_pass http://localhost:9000/fys/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location /backend/ { + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header REMOTE-HOST $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_read_timeout 86400s; + # sse 与 websocket参数 + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_buffering off; + proxy_cache off; + proxy_pass http://localhost:8000/; + } + + # API 代理 + location /jq/ { + proxy_pass http://localhost:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + } + + # another virtual host using mix of IP-, name-, and port-based configuration + # + #server { + # listen 8000; + # listen somename:8080; + # server_name somename alias another.alias; + + # location / { + # root html; + # index index.html index.htm; + # } + #} + + # HTTPS server + server { + listen 443 ssl; + server_name cnxhyc.com www.cnxhyc.com; + + ssl_certificate /cert/cnxhyc.com.pem; + ssl_certificate_key /cert/cnxhyc.com.key; + + # 使用更现代的 SSL 配置 + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers TLS_AES_256_GCM_SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; + ssl_prefer_server_ciphers off; + + ssl_session_cache shared:SSL:10m; + ssl_session_timeout 10m; + + location /wss { + proxy_pass http://websocket; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + } + + location / { + root html; + index index.html index.htm; + } + + # API 代理 + location /jq/ { + proxy_pass http://47.107.152.87:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + + # 重定向 /xh 到 /xh/ + location = /xh { + return 301 /xh/; + } + + # 后台系统 + location /xh/ { + alias /usr/local/nginx/html/jingquan/; + try_files $uri $uri/ /jingquan/index.html; + } + } +} \ No newline at end of file