From ac353b1078bc5158a3120d37a49a07eb7b2ad900 Mon Sep 17 00:00:00 2001 From: chenyouting <514333061@qq.com> Date: Wed, 30 Jul 2025 08:50:44 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E5=A4=87mqtt=E6=94=B6=E5=8F=91?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/controller/AppDeviceController.java | 41 +++++ .../app/domain/dto/AppLightModeDto.java | 13 ++ .../app/service/AppDeviceBizService.java | 100 +++++++++++- .../global/mqtt/base/MqttMessageRule.java | 26 ++++ .../global/mqtt/base/MqttRuleContext.java | 32 ++++ .../global/mqtt/base/MqttRuleEngine.java | 38 +++++ .../mqtt/config/MqttConfiguration.java | 2 +- .../global}/mqtt/config/MqttGateway.java | 2 +- .../mqtt/config/MqttInboundConfiguration.java | 4 +- .../config/MqttOutboundConfiguration.java | 2 +- .../mqtt/config/MqttPropertiesConfig.java | 2 +- .../constants/DeviceCommandTypeConstants.java | 78 ++++++++++ .../global}/mqtt/constants/MqttConstants.java | 2 +- .../mqtt/publish/DeviceDataController.java | 2 +- .../global}/mqtt/publish/MqttClientTest.java | 4 +- .../mqtt/publish/MqttMessageSender.java | 4 +- .../mqtt/receiver/ReceiverMessageHandler.java | 63 ++++++++ .../global/mqtt/rule/LocationDataRule.java | 143 ++++++++++++++++++ .../global/mqtt/rule/PersonnelInfoRule.java | 76 ++++++++++ .../core/utils/ImageToCArrayConverter.java | 60 ++++++++ .../mqtt/receiver/ReceiverMessageHandler.java | 90 ----------- 21 files changed, 679 insertions(+), 105 deletions(-) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/app/domain/dto/AppLightModeDto.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessageRule.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleContext.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/config/MqttConfiguration.java (96%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/config/MqttGateway.java (94%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/config/MqttInboundConfiguration.java (95%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/config/MqttOutboundConfiguration.java (97%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/config/MqttPropertiesConfig.java (91%) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/DeviceCommandTypeConstants.java rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/constants/MqttConstants.java (81%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/publish/DeviceDataController.java (93%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/publish/MqttClientTest.java (84%) rename {fys-modules/fys-system/src/main/java/com/fuyuanshen/system => fys-admin/src/main/java/com/fuyuanshen/global}/mqtt/publish/MqttMessageSender.java (89%) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/PersonnelInfoRule.java delete mode 100644 fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppDeviceController.java b/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppDeviceController.java index 0b4e3def..4644d5b2 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppDeviceController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/controller/AppDeviceController.java @@ -1,5 +1,6 @@ package com.fuyuanshen.app.controller; +import com.alibaba.fastjson2.JSONObject; import com.fuyuanshen.app.domain.bo.AppPersonnelInfoBo; import com.fuyuanshen.app.domain.dto.APPReNameDTO; import com.fuyuanshen.app.domain.dto.AppDeviceLogoUploadDto; @@ -122,4 +123,44 @@ public class AppDeviceController extends BaseController { return R.ok(); } + + /** + * 灯光模式 + * 0(关灯),1(强光模式),2(弱光模式), 3(爆闪模式), 4(泛光模式) + */ + @PostMapping("/lightModeSettings") + public R lightModeSettings(@RequestBody JSONObject params) { + appDeviceService.lightModeSettings(params); + return R.ok(); + } + + /** + * 灯光亮度设置 + * + */ + @PostMapping("/lightBrightnessSettings") + public R lightBrightnessSettings(@RequestBody JSONObject params) { + appDeviceService.lightBrightnessSettings(params); + return R.ok(); + } + + /** + * 激光模式设置 + * + */ + @PostMapping("/laserModeSettings") + public R laserModeSettings(@RequestBody JSONObject params) { + appDeviceService.laserModeSettings(params); + return R.ok(); + } + + /** + * 地图逆解析 + * + */ + @PostMapping("/mapReverseGeocoding") + public R mapReverseGeocoding(@RequestBody JSONObject params) { + String mapJson = appDeviceService.mapReverseGeocoding(params); + return R.ok(mapJson); + } } diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/domain/dto/AppLightModeDto.java b/fys-admin/src/main/java/com/fuyuanshen/app/domain/dto/AppLightModeDto.java new file mode 100644 index 00000000..732649ce --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/app/domain/dto/AppLightModeDto.java @@ -0,0 +1,13 @@ +package com.fuyuanshen.app.domain.dto; + +import lombok.Data; + +@Data +public class AppLightModeDto { + + private Long deviceId; + + //0(关灯),1(强光模式),2(弱光模式), 3(爆闪模式), 4(泛光模式) + private Integer mode; + +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/service/AppDeviceBizService.java b/fys-admin/src/main/java/com/fuyuanshen/app/service/AppDeviceBizService.java index 66b93eed..a063aef6 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/service/AppDeviceBizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/service/AppDeviceBizService.java @@ -1,6 +1,7 @@ package com.fuyuanshen.app.service; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -21,6 +22,7 @@ import com.fuyuanshen.common.core.exception.ServiceException; import com.fuyuanshen.common.core.utils.ImageToCArrayConverter; import com.fuyuanshen.common.core.utils.MapstructUtils; import com.fuyuanshen.common.core.utils.ObjectUtils; +import com.fuyuanshen.common.core.utils.StringUtils; import com.fuyuanshen.common.mybatis.core.page.PageQuery; import com.fuyuanshen.common.mybatis.core.page.TableDataInfo; import com.fuyuanshen.common.redis.utils.RedisUtils; @@ -40,8 +42,8 @@ import static com.fuyuanshen.common.core.utils.Bitmap80x12Generator.*; import static com.fuyuanshen.common.core.utils.ImageToCArrayConverter.convertHexToDecimal; import com.fuyuanshen.equipment.utils.c.ReliableTextToBitmap; -import com.fuyuanshen.system.mqtt.config.MqttGateway; -import com.fuyuanshen.system.mqtt.constants.MqttConstants; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.constants.MqttConstants; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -352,7 +354,7 @@ public class AppDeviceBizService { System.out.println("原始数据大小: " + largeData.length + " 字节"); int[] ints = convertHexToDecimal(largeData); - RedisUtils.setCacheObject("app_logo_data:"+device.getDeviceImei(), Arrays.toString(ints), Duration.ofSeconds(24*60*60L)); + RedisUtils.setCacheObject("app_logo_data:"+device.getDeviceImei(), Arrays.toString(ints), Duration.ofSeconds(30*60L)); String data = RedisUtils.getCacheObject("app_logo_data:"+device.getDeviceImei()); @@ -377,4 +379,96 @@ public class AppDeviceBizService { e.printStackTrace(); } } + + /** + * 灯光模式 + * 0(关灯),1(强光模式),2(弱光模式), 3(爆闪模式), 4(泛光模式) + */ + public void lightModeSettings(JSONObject params) { + try { + Long deviceId = params.getLong("deviceId"); + Device device = deviceMapper.selectById(deviceId); + if(device == null){ + throw new ServiceException("设备不存在"); + } + Integer instructValue = params.getInteger("instructValue"); + ArrayList intData = new ArrayList<>(); + intData.add(1); + intData.add(instructValue); + intData.add(0); + intData.add(0); + intData.add(0); + Map map = new HashMap<>(); + map.put("instruct", intData); + mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY+device.getDeviceImei(), 1 , JSON.toJSONString(map)); + log.info("发送点阵数据到设备消息=>topic:{},payload:{}", MqttConstants.GLOBAL_PUB_KEY+device.getDeviceImei(),JSON.toJSONString(map)); + } catch (Exception e){ + e.printStackTrace(); + } + } + + //灯光亮度设置 + public void lightBrightnessSettings(JSONObject params) { + try { + Long deviceId = params.getLong("deviceId"); + Device device = deviceMapper.selectById(deviceId); + if(device == null){ + throw new ServiceException("设备不存在"); + } + String instructValue = params.getString("instructValue"); + ArrayList intData = new ArrayList<>(); + intData.add(5); + String[] values = instructValue.split("\\."); + String value1 = values[0]; + String value2 = values[1]; + if(StringUtils.isNoneBlank(value1)){ + intData.add(Integer.parseInt(value1)); + } + if(StringUtils.isNoneBlank(value2)){ + intData.add(Integer.parseInt(value2)); + } + intData.add(0); + intData.add(0); + intData.add(0); + Map map = new HashMap<>(); + map.put("instruct", intData); + mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY+device.getDeviceImei(), 1 , JSON.toJSONString(map)); + log.info("发送点阵数据到设备消息=>topic:{},payload:{}", MqttConstants.GLOBAL_PUB_KEY+device.getDeviceImei(),JSON.toJSONString(map)); + } catch (Exception e){ + e.printStackTrace(); + } + } + + //激光模式设置 + public void laserModeSettings(JSONObject params) { + try { + Long deviceId = params.getLong("deviceId"); + Device device = deviceMapper.selectById(deviceId); + if(device == null){ + throw new ServiceException("设备不存在"); + } + Integer instructValue = params.getInteger("instructValue"); + ArrayList intData = new ArrayList<>(); + intData.add(4); + intData.add(instructValue); + intData.add(0); + intData.add(0); + intData.add(0); + Map map = new HashMap<>(); + map.put("instruct", intData); + mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY+device.getDeviceImei(), 1 , JSON.toJSONString(map)); + log.info("发送点阵数据到设备消息=>topic:{},payload:{}", MqttConstants.GLOBAL_PUB_KEY+device.getDeviceImei(),JSON.toJSONString(map)); + } catch (Exception e){ + e.printStackTrace(); + } + } + + public String mapReverseGeocoding(JSONObject params) { + Long deviceId = params.getLong("deviceId"); + Device device = deviceMapper.selectById(deviceId); + if(device == null){ + throw new ServiceException("设备不存在"); + } + return RedisUtils.getCacheObject("device:location:" + device.getDeviceImei()); + } } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessageRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessageRule.java new file mode 100644 index 00000000..ab412515 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessageRule.java @@ -0,0 +1,26 @@ +package com.fuyuanshen.global.mqtt.base; + +/** + * MQTT消息处理接口 + */ +public interface MqttMessageRule { + + /** + * 获取命令类型 + * @return 命令类型 + */ + Integer getCommandType(); + /** + * 执行处理 + * @param context 处理上下文 + */ + void execute(MqttRuleContext context); + + /** + * 获取优先级,数值越小优先级越高 + * @return 优先级 + */ + default int getPriority() { + return 0; + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleContext.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleContext.java new file mode 100644 index 00000000..cccbe156 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleContext.java @@ -0,0 +1,32 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +import java.util.Map; + +/** + * MQTT消息处理上下文 + */ +@Data +public class MqttRuleContext { + /** + * 命令类型 + */ + private byte commandType; + /** + * 转换后的参数数组 + */ + private Object[] convertArr; + /** + * 设备IMEI + */ + private String deviceImei; + /** + * 数据来源Redis + */ + private String dataFromRedis; + /** + * MQTT消息负载字典 + */ + private Map payloadDict; +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java new file mode 100644 index 00000000..9e55a8df --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttRuleEngine.java @@ -0,0 +1,38 @@ +package com.fuyuanshen.global.mqtt.base; + +import org.springframework.stereotype.Component; + +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * MQTT消息引擎 + */ +@Component +public class MqttRuleEngine { + + + private final LinkedHashMap rulesMap = new LinkedHashMap<>(); + public MqttRuleEngine(List rules) { + // 按优先级排序 + rules.sort(Comparator.comparing(MqttMessageRule::getPriority)); + rules.forEach(rule -> rulesMap.put(rule.getCommandType(), rule)); + } + + /** + * 执行匹配 + * @param context 处理上下文 + * @return + */ + public boolean executeRule(MqttRuleContext context) { + int commandType = context.getCommandType(); + MqttMessageRule mqttMessageRule = rulesMap.get(commandType); + if (mqttMessageRule != null) { + mqttMessageRule.execute(context); + return true; + } + + return false; + } +} diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttConfiguration.java similarity index 96% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttConfiguration.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttConfiguration.java index b05cb3ef..745dd638 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttConfiguration.java @@ -1,4 +1,4 @@ -package com.fuyuanshen.system.mqtt.config; +package com.fuyuanshen.global.mqtt.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttGateway.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttGateway.java similarity index 94% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttGateway.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttGateway.java index 0f24318b..a1b8eb24 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttGateway.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttGateway.java @@ -1,4 +1,4 @@ -package com.fuyuanshen.system.mqtt.config; +package com.fuyuanshen.global.mqtt.config; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttInboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java similarity index 95% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttInboundConfiguration.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java index de67566b..102fe71e 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttInboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttInboundConfiguration.java @@ -1,8 +1,8 @@ -package com.fuyuanshen.system.mqtt.config; +package com.fuyuanshen.global.mqtt.config; import cn.hutool.core.lang.UUID; -import com.fuyuanshen.system.mqtt.receiver.ReceiverMessageHandler; +import com.fuyuanshen.global.mqtt.receiver.ReceiverMessageHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttOutboundConfiguration.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java similarity index 97% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttOutboundConfiguration.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java index cb1bcb00..01cddded 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttOutboundConfiguration.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttOutboundConfiguration.java @@ -1,4 +1,4 @@ -package com.fuyuanshen.system.mqtt.config; +package com.fuyuanshen.global.mqtt.config; import cn.hutool.core.lang.UUID; import lombok.extern.slf4j.Slf4j; diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttPropertiesConfig.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java similarity index 91% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttPropertiesConfig.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java index 9745925d..6c5d7db9 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/config/MqttPropertiesConfig.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/config/MqttPropertiesConfig.java @@ -1,4 +1,4 @@ -package com.fuyuanshen.system.mqtt.config; +package com.fuyuanshen.global.mqtt.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/DeviceCommandTypeConstants.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/DeviceCommandTypeConstants.java new file mode 100644 index 00000000..0d93921d --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/DeviceCommandTypeConstants.java @@ -0,0 +1,78 @@ +package com.fuyuanshen.global.mqtt.constants; + +/** + * 设备命令类型常量 + * Device Command Type Constants + */ +public class DeviceCommandTypeConstants { + + /** + * 灯光模式 (Light Mode) + */ + public static final int LIGHT_MODE = 1; + + /** + * 人员信息 (Personnel Information) + */ + public static final int PERSONNEL_INFO = 2; + + /** + * 开机LOGO (Boot Logo) + */ + public static final int BOOT_LOGO = 3; + + /** + * 激光灯 (Laser Light) + */ + public static final int LASER_LIGHT = 4; + + /** + * 主灯亮度 (Main Light Brightness) + */ + public static final int MAIN_LIGHT_BRIGHTNESS = 5; + + /** + * 定位数据 (Location Data) + */ + public static final int LOCATION_DATA = 11; + + /** + * 获取命令类型描述 + * + * @param commandType 命令类型 + * @return 命令类型描述 + */ + public static String getCommandTypeDescription(int commandType) { + switch (commandType) { + case LIGHT_MODE: + return "灯光模式 (Light Mode)"; + case PERSONNEL_INFO: + return "人员信息 (Personnel Information)"; + case BOOT_LOGO: + return "开机LOGO (Boot Logo)"; + case LASER_LIGHT: + return "激光灯 (Laser Light)"; + case MAIN_LIGHT_BRIGHTNESS: + return "主灯亮度 (Main Light Brightness)"; + case LOCATION_DATA: + return "定位数据 (Location Data)"; + default: + return "未知命令类型 (Unknown Command Type)"; + } + } + + /** + * 检查是否为有效命令类型 + * + * @param commandType 命令类型 + * @return 是否有效 + */ + public static boolean isValidCommandType(int commandType) { + return commandType == LIGHT_MODE || + commandType == PERSONNEL_INFO || + commandType == BOOT_LOGO || + commandType == LASER_LIGHT || + commandType == MAIN_LIGHT_BRIGHTNESS || + commandType == LOCATION_DATA; + } +} diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/constants/MqttConstants.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/MqttConstants.java similarity index 81% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/constants/MqttConstants.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/MqttConstants.java index 49398a18..7a80a1e8 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/constants/MqttConstants.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/MqttConstants.java @@ -1,4 +1,4 @@ -package com.fuyuanshen.system.mqtt.constants; +package com.fuyuanshen.global.mqtt.constants; public interface MqttConstants { diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/DeviceDataController.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java similarity index 93% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/DeviceDataController.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java index e8794020..8dacb880 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/DeviceDataController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/DeviceDataController.java @@ -1,4 +1,4 @@ -package com.fuyuanshen.system.mqtt.publish; +package com.fuyuanshen.global.mqtt.publish; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttClientTest.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java similarity index 84% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttClientTest.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java index 2332809e..776b4997 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttClientTest.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttClientTest.java @@ -1,7 +1,7 @@ -package com.fuyuanshen.system.mqtt.publish; +package com.fuyuanshen.global.mqtt.publish; -import com.fuyuanshen.system.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.config.MqttGateway; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttMessageSender.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttMessageSender.java similarity index 89% rename from fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttMessageSender.java rename to fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttMessageSender.java index 05bf77e8..dcb429cd 100644 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/publish/MqttMessageSender.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/publish/MqttMessageSender.java @@ -1,6 +1,6 @@ -package com.fuyuanshen.system.mqtt.publish; +package com.fuyuanshen.global.mqtt.publish; -import com.fuyuanshen.system.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.config.MqttGateway; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java new file mode 100644 index 00000000..f7619a6d --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -0,0 +1,63 @@ +package com.fuyuanshen.global.mqtt.receiver; + +import cn.hutool.core.lang.Dict; +import com.fuyuanshen.common.core.utils.ImageToCArrayConverter; +import com.fuyuanshen.common.json.utils.JsonUtils; +import com.fuyuanshen.global.mqtt.base.MqttRuleContext; +import com.fuyuanshen.global.mqtt.base.MqttRuleEngine; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Service; + +import java.util.Objects; + +@Service +@Slf4j +public class ReceiverMessageHandler implements MessageHandler { + + @Autowired + private MqttRuleEngine ruleEngine; + + @Override + public void handleMessage(Message message) throws MessagingException { + Object payload = message.getPayload(); + MessageHeaders headers = message.getHeaders(); + String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); + String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); + + log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}", + payload, receivedTopic, receivedQos, timestamp); + + Dict payloadDict = JsonUtils.parseMap(payload.toString()); + if (receivedTopic == null || payloadDict == null) { + return; + } + + String state = payloadDict.getStr("state"); + Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state); + + if (convertArr.length > 0) { + Byte val1 = (Byte) convertArr[0]; + String[] subStr = receivedTopic.split("/"); + System.out.println("收到设备id: " + subStr[1]); + String deviceImei = subStr[1]; + + MqttRuleContext context = new MqttRuleContext(); + context.setCommandType(val1); + context.setConvertArr(convertArr); + context.setDeviceImei(deviceImei); + context.setPayloadDict(payloadDict); + + boolean ruleExecuted = ruleEngine.executeRule(context); + + if (!ruleExecuted) { + log.warn("未找到匹配的规则来处理命令类型: {}", val1); + } + } + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java new file mode 100644 index 00000000..1345ce62 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java @@ -0,0 +1,143 @@ +package com.fuyuanshen.global.mqtt.rule; + +import com.fuyuanshen.common.json.utils.JsonUtils; +import com.fuyuanshen.common.redis.utils.RedisUtils; +import com.fuyuanshen.equipment.utils.c.map.GetAddressFromLatUtil; +import com.fuyuanshen.global.mqtt.base.MqttMessageRule; +import com.fuyuanshen.global.mqtt.base.MqttRuleContext; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.constants.DeviceCommandTypeConstants; +import com.fuyuanshen.global.mqtt.constants.MqttConstants; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * 定位数据命令处理 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class LocationDataRule implements MqttMessageRule { + + private final MqttGateway mqttGateway; + + + @Override + public Integer getCommandType() { + return DeviceCommandTypeConstants.LOCATION_DATA; + } + + @Override + public void execute(MqttRuleContext context) { + try { + Object[] convertArr = context.getConvertArr(); + // Latitude, longitude + String latitude = convertArr[1].toString(); + String longitude = convertArr[2].toString(); + + // 异步发送经纬度到Redis + asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude); + + Map map = buildLocationDataMap(latitude, longitude); + mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); + log.info("发送定位数据到设备=>topic:{},payload:{}", + MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), + JsonUtils.toJsonString(map)); + } catch (Exception e) { + log.error("处理定位数据命令时出错", e); + } + } + + /** + * 异步发送位置信息到Redis + * + * @param deviceImei 设备IMEI + * @param latitude 纬度 + * @param longitude 经度 + * @param timestamp 时间戳 + */ +// @Async +// public void asyncSendLocationToRedis(String deviceImei, String latitude, String longitude, long timestamp) { +// try { +// // 构造位置信息对象 +// Map locationInfo = new HashMap<>(); +// locationInfo.put("deviceImei", deviceImei); +// locationInfo.put("latitude", latitude); +// locationInfo.put("longitude", longitude); +// locationInfo.put("timestamp", timestamp); +// +// // 将位置信息存储到Redis中,使用设备IMEI作为key的一部分 +// String redisKey = "device:location:" + deviceImei; +// String locationJson = JsonUtils.toJsonString(locationInfo); +// +// // 存储到Redis,设置过期时间(例如24小时) +// RedisUtils.setCacheObject(redisKey, locationJson, Duration.ofSeconds(24 * 60 * 60)); +// +// // 也可以存储到一个列表中,保留历史位置信息 +// String locationHistoryKey = "device:location:history:" + deviceImei; +// RedisUtils.lPush(locationHistoryKey, locationJson, 24 * 60 * 60); +// +// log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude); +// } catch (Exception e) { +// log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e); +// } +// } + + /** + * 异步发送位置信息到Redis(使用CompletableFuture) + * + * @param deviceImei 设备IMEI + * @param latitude 纬度 + * @param longitude 经度 + */ + public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) { + CompletableFuture.runAsync(() -> { + try { + // 构造位置信息对象 + Map locationInfo = new LinkedHashMap<>(); + locationInfo.put("deviceImei", deviceImei); + locationInfo.put("latitude", latitude); + locationInfo.put("longitude", longitude); + String address = GetAddressFromLatUtil.getAdd(longitude, latitude); + locationInfo.put("address", address); + locationInfo.put("timestamp", System.currentTimeMillis()); + + + // 将位置信息存储到Redis中 + String redisKey = "device:location:" + deviceImei; + String locationJson = JsonUtils.toJsonString(locationInfo); + + // 存储到Redis + RedisUtils.setCacheObject(redisKey, locationJson, Duration.ofSeconds(24 * 60 * 60)); + + log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude); + } catch (Exception e) { + log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e); + } + }); + } + + private Map buildLocationDataMap(String latitude, String longitude) { + String[] latArr = latitude.split("\\."); + String[] lonArr = longitude.split("\\."); + + ArrayList intData = new ArrayList<>(); + intData.add(DeviceCommandTypeConstants.LOCATION_DATA); + intData.add(Integer.parseInt(latArr[0])); + intData.add(Integer.parseInt(latArr[1])); + intData.add(Integer.parseInt(lonArr[0])); + intData.add(Integer.parseInt(lonArr[1])); + + Map map = new HashMap<>(); + map.put("instruct", intData); + return map; + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/PersonnelInfoRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/PersonnelInfoRule.java new file mode 100644 index 00000000..14675610 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/PersonnelInfoRule.java @@ -0,0 +1,76 @@ +package com.fuyuanshen.global.mqtt.rule; + +import com.fuyuanshen.common.core.utils.ImageToCArrayConverter; +import com.fuyuanshen.common.core.utils.StringUtils; +import com.fuyuanshen.common.json.utils.JsonUtils; +import com.fuyuanshen.common.redis.utils.RedisUtils; +import com.fuyuanshen.global.mqtt.base.MqttMessageRule; +import com.fuyuanshen.global.mqtt.base.MqttRuleContext; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.constants.DeviceCommandTypeConstants; +import com.fuyuanshen.global.mqtt.constants.MqttConstants; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static com.fuyuanshen.common.core.utils.ImageToCArrayConverter.convertHexToDecimal; + +/** + * 人员信息命令处理 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class PersonnelInfoRule implements MqttMessageRule { + + private final MqttGateway mqttGateway; + + @Override + public Integer getCommandType() { + return DeviceCommandTypeConstants.PERSONNEL_INFO; + } + + @Override + public void execute(MqttRuleContext context) { + try { + Byte val2 = (Byte) context.getConvertArr()[1]; + if (val2 == 100) { + return; + } + + String data = RedisUtils.getCacheObject("894078:app_logo_data:" + context.getDeviceImei()); + if (StringUtils.isEmpty(data)) { + return; + } + + byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data); + byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512); + System.out.println("第" + val2 + "块数据大小: " + specificChunk.length + " 字节"); + System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk)); + + ArrayList intData = new ArrayList<>(); + intData.add(3); + intData.add((int) val2); + ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData); + intData.add(0); + intData.add(0); + intData.add(0); + intData.add(0); + + Map map = new HashMap<>(); + map.put("instruct", intData); + + mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map)); + log.info("发送人员信息点阵数据到设备消息=>topic:{},payload:{}", + MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), + JsonUtils.toJsonString(map)); + } catch (Exception e) { + log.error("处理人员信息命令时出错", e); + } + } +} diff --git a/fys-common/fys-common-core/src/main/java/com/fuyuanshen/common/core/utils/ImageToCArrayConverter.java b/fys-common/fys-common-core/src/main/java/com/fuyuanshen/common/core/utils/ImageToCArrayConverter.java index 0ad17823..45fea748 100644 --- a/fys-common/fys-common-core/src/main/java/com/fuyuanshen/common/core/utils/ImageToCArrayConverter.java +++ b/fys-common/fys-common-core/src/main/java/com/fuyuanshen/common/core/utils/ImageToCArrayConverter.java @@ -310,4 +310,64 @@ public class ImageToCArrayConverter { return new byte[0]; } } + + /** + * 将字节字符串转换为混合类型的Object数组 + * + * @param byteString 字节字符串 + * @return Object数组,包含不同类型的对象 + */ + public static Object[] convertByteStringToMixedObjectArray(String byteString) { + if (byteString == null || byteString.isEmpty()) { + return new Object[0]; + } + + try { + // 移除方括号(如果存在) + String content = byteString.trim(); + if (content.startsWith("[")) { + content = content.substring(1); + } + if (content.endsWith("]")) { + content = content.substring(0, content.length() - 1); + } + + // 按逗号分割 + String[] byteValues = content.split(","); + Object[] result = new Object[byteValues.length]; + + // 转换每个值为适当类型的对象 + for (int i = 0; i < byteValues.length; i++) { + String value = byteValues[i].trim(); + + // 处理可能的进制前缀 + if (value.startsWith("0x") || value.startsWith("0X")) { + // 十六进制 + int intValue = Integer.parseInt(value.substring(2), 16); + result[i] = intValue; + } else { + // 尝试解析为整数 + try { + int intValue = Integer.parseInt(value); + // 根据值的范围选择合适的类型 + if (intValue >= Byte.MIN_VALUE && intValue <= Byte.MAX_VALUE) { + result[i] = (byte) intValue; + } else if (intValue >= Short.MIN_VALUE && intValue <= Short.MAX_VALUE) { + result[i] = (short) intValue; + } else { + result[i] = intValue; + } + } catch (NumberFormatException e) { + // 如果不是数字,保持为字符串 + result[i] = value; + } + } + } + + return result; + } catch (Exception e) { + System.err.println("解析字节字符串时出错: " + e.getMessage()); + return new Object[0]; + } + } } \ No newline at end of file diff --git a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java b/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java deleted file mode 100644 index 549f0697..00000000 --- a/fys-modules/fys-system/src/main/java/com/fuyuanshen/system/mqtt/receiver/ReceiverMessageHandler.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.fuyuanshen.system.mqtt.receiver; - -import cn.hutool.core.lang.Dict; -import com.fuyuanshen.common.core.utils.ImageToCArrayConverter; -import com.fuyuanshen.common.core.utils.StringUtils; -import com.fuyuanshen.common.json.utils.JsonUtils; -import com.fuyuanshen.common.redis.utils.RedisUtils; -import com.fuyuanshen.common.satoken.utils.AppLoginHelper; -import com.fuyuanshen.common.satoken.utils.LoginHelper; -import com.fuyuanshen.system.mqtt.config.MqttGateway; -import com.fuyuanshen.system.mqtt.constants.MqttConstants; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.stereotype.Service; - -import java.util.*; - -import static com.fuyuanshen.common.core.utils.ImageToCArrayConverter.convertHexToDecimal; - -@Service -@Slf4j -public class ReceiverMessageHandler implements MessageHandler { - - @Autowired - private MqttGateway mqttGateway; - - @Override - public void handleMessage(Message message) throws MessagingException{ - String tenantId = AppLoginHelper.getTenantId() != null ? AppLoginHelper.getTenantId() : LoginHelper.getTenantId(); - Object payload = message.getPayload(); - MessageHeaders headers = message.getHeaders(); - String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); - String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString(); - String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString(); - log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}" - ,payload,receivedTopic,receivedQos,timestamp); - Dict payloadDict = JsonUtils.parseMap(payload.toString()); - if(receivedTopic ==null){ - return; - } - if(payloadDict == null){ - return; - } - String state = payloadDict.getStr("state"); - - byte[] convertArr = ImageToCArrayConverter.convertStringToByteArray( state); - if(convertArr.length>0){ - byte val1 = convertArr[0]; - byte val2 = convertArr[1]; - String[] subStr = receivedTopic.split("/"); - System.out.println("收到设备id: " + subStr[1]); - String deviceImei = subStr[1]; - if(val1==3){ - - if(val2==100){ - return; - } - - String data = RedisUtils.getCacheObject("894078:app_logo_data:"+deviceImei); - if(StringUtils.isEmpty(data)){ - return; - } - byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data); - byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2-1), 512); - System.out.println("第"+val2+"块数据大小: " + specificChunk.length + " 字节"); - System.out.println("第"+val2+"块数据: " + Arrays.toString(specificChunk)); - - ArrayList intData = new ArrayList<>(); - intData.add(3); - intData.add((int) val2); - ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk),intData); - intData.add(0); - intData.add(0); - intData.add(0); - intData.add(0); - Map map = new HashMap<>(); - map.put("instruct", intData); - mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY+deviceImei, 1 , JsonUtils.toJsonString( map)); - log.info("发送点阵数据到设备消息=>topic:{},payload:{}", MqttConstants.GLOBAL_PUB_KEY+deviceImei,JsonUtils.toJsonString(map)); - } - System.out.println("val1:"+ val1); - System.out.println("val2:"+ val2); - - } - } -} \ No newline at end of file