From f1a19f95f5a83057ae267a7a39f3f38dd483670f Mon Sep 17 00:00:00 2001 From: DragonWenLong <552045633@qq.com> Date: Thu, 21 Aug 2025 17:44:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(mqtt):=20=E6=96=B0=E5=A2=9E=E6=98=9F?= =?UTF-8?q?=E6=B1=89=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= =?UTF-8?q?=E8=A7=84=E5=88=99=E5=92=8C=E8=A7=A3=E6=9E=90=E9=80=BB=E8=BE=91?= =?UTF-8?q?-=20=E6=B7=BB=E5=8A=A0=20MqttXinghanCommandType=20=E6=9E=9A?= =?UTF-8?q?=E4=B8=BE=E7=B1=BB=EF=BC=8C=E7=94=A8=E4=BA=8E=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E5=92=8C=E5=A4=84=E7=90=86=E6=98=9F=E6=B1=89=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E7=9A=84=E5=91=BD=E4=BB=A4=E7=B1=BB=E5=9E=8B=20-=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20MqttXinghanJson=20=E7=B1=BB=EF=BC=8C=E7=94=A8?= =?UTF-8?q?=E4=BA=8E=E8=A7=A3=E6=9E=90=E6=98=9F=E6=B1=89=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E7=9A=84=20JSON=20=E6=95=B0=E6=8D=AE=20-=20=E5=9C=A8=20Receive?= =?UTF-8?q?rMessageHandler=20=E4=B8=AD=E9=9B=86=E6=88=90=E6=98=9F=E6=B1=89?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AE=E7=9A=84=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91=20-=20=E6=B7=BB=E5=8A=A0=20XingHanCommandTyp?= =?UTF-8?q?eConstants=20=E5=B8=B8=E9=87=8F=E7=B1=BB=EF=BC=8C=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E6=98=9F=E6=B1=89=E8=AE=BE=E5=A4=87=E7=9A=84=E5=91=BD?= =?UTF-8?q?=E4=BB=A4=E7=B1=BB=E5=9E=8B=E5=B8=B8=E9=87=8F=20-=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20XinghanDeviceDataRule=20=E7=B1=BB=EF=BC=8C=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=98=9F=E6=B1=89=E8=AE=BE=E5=A4=87=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E7=9A=84=E6=95=B0=E6=8D=AE=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/base/MqttXinghanCommandType.java | 56 +++++ .../global/mqtt/base/MqttXinghanJson.java | 65 ++++++ .../XingHanCommandTypeConstants.java | 16 ++ .../mqtt/receiver/ReceiverMessageHandler.java | 15 ++ .../rule/xinghan/XinghanDeviceDataRule.java | 212 ++++++++++++++++++ 5 files changed, 364 insertions(+) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanCommandType.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanJson.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/XingHanCommandTypeConstants.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanCommandType.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanCommandType.java new file mode 100644 index 00000000..6f6a6a81 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanCommandType.java @@ -0,0 +1,56 @@ +package com.fuyuanshen.global.mqtt.base; + +import cn.hutool.core.lang.Dict; + +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.Collections; +import org.springframework.stereotype.Component; + + +@Component +public final class MqttXinghanCommandType { + private MqttXinghanCommandType() {} + + public enum XinghanCommandTypeEnum { + GRADE_INFO(101), + PIC_TRANS(102), + TEX_TRANS(103), + BREAK_NEWS(104), + UNKNOWN(0); + + private final int value; + XinghanCommandTypeEnum(int value) { this.value = value; } + public int getValue() { return value; } + } + + private static final Map KEY_TO_TYPE; + static { + LinkedHashMap map = new LinkedHashMap<>(); + map.put("sta_DetectGrade", XinghanCommandTypeEnum.GRADE_INFO); + map.put("sta_PowerTime", XinghanCommandTypeEnum.GRADE_INFO); + map.put("sta_longitude", XinghanCommandTypeEnum.GRADE_INFO); + map.put("sta_latitude", XinghanCommandTypeEnum.GRADE_INFO); + map.put("sta_PicTrans", XinghanCommandTypeEnum.PIC_TRANS); + map.put("sta_TexTrans", XinghanCommandTypeEnum.TEX_TRANS); + map.put("sta_BreakNews", XinghanCommandTypeEnum.BREAK_NEWS); + KEY_TO_TYPE = Collections.unmodifiableMap(map); + } + + public static int computeVirtualCommandType(Dict payloadDict) { + if (payloadDict == null) { + return XinghanCommandTypeEnum.UNKNOWN.getValue(); + } + try { + for (String key : KEY_TO_TYPE.keySet()) { + if (payloadDict.containsKey(key)) { + return KEY_TO_TYPE.get(key).getValue(); + } + } + } catch (Exception ex) { + return XinghanCommandTypeEnum.UNKNOWN.getValue(); + } + + return XinghanCommandTypeEnum.UNKNOWN.getValue(); + } +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanJson.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanJson.java new file mode 100644 index 00000000..fde03ac9 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttXinghanJson.java @@ -0,0 +1,65 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; +import com.fasterxml.jackson.annotation.JsonProperty; + +@Data +public class MqttXinghanJson { + + /** + * 第一键值对,静电预警档位:3,2,1,0,分别表示高档/中档/低挡/关闭. + */ + @JsonProperty("sta_DetectGrade") + private Integer staDetectGrade; + /** + * 第二键值对,照明档位,2,1,0,分别表示弱光/强光/关闭 + */ + @JsonProperty("sta_LightGrade") + private Integer staLightGrade; + /** + * 第三键值对,SOS档位,2,1,0, 分别表示红蓝模式/爆闪模式/关闭 + */ + @JsonProperty("sta_SOSGrade") + public int staSOSGrade; + /** + * 第四键值对,剩余照明时间,0-5999,单位分钟。 + */ + @JsonProperty("sta_PowerTime") + public int staPowerTime; + /** + * 第五键值对,剩余电量百分比,0-100 + */ + @JsonProperty("sta_PowerPercent") + public int staPowerPercent; + /** + * 第六键值对, 近电预警级别, 0-无预警,1-弱预警,2-中预警,3-强预警,4-非常强预警。 + */ + @JsonProperty("sta_DetectResult") + public int staDetectResult; + /** + * 第七键值对, 静止报警状态,0-未静止报警,1-正在静止报警。 + */ + @JsonProperty("staShakeBit") + public int sta_ShakeBit; + /** + * 第八键值对, 4G信号强度,0-32,数值越大,信号越强。 + */ + @JsonProperty("sta_4gSinal") + public int sta4gSinal; + /** + * 第九键值对,IMIE卡号 + */ + @JsonProperty("sta_imei") + public int staimei; + /** + * 第十键值对,经度 + */ + @JsonProperty("sta_longitude") + public String stalongitude; + /** + * 第十一键值对,纬度 + */ + @JsonProperty("sta_latitude") + public String stalatitude; + +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/XingHanCommandTypeConstants.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/XingHanCommandTypeConstants.java new file mode 100644 index 00000000..a7f4ba28 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/constants/XingHanCommandTypeConstants.java @@ -0,0 +1,16 @@ +package com.fuyuanshen.global.mqtt.constants; + +public class XingHanCommandTypeConstants { + /** + * 星汉设备主动上报数据 (XingHan Device Data) + */ + public static final String XingHan_DEVICE_DATA = "Light_101"; + /** + * 星汉开机LOGO (XingHan Boot Logo) + */ + public static final String XingHan_BOOT_LOGO = "Light_102"; + /** + * 星汉设备发送消息 (XingHan send msg) + */ + public static final String XingHan_ESEND_MSG = "Light_103"; +} diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java index 811acd54..4259cd18 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -8,6 +8,7 @@ import com.fuyuanshen.common.json.utils.JsonUtils; import com.fuyuanshen.common.redis.utils.RedisUtils; import com.fuyuanshen.global.mqtt.base.MqttRuleContext; import com.fuyuanshen.global.mqtt.base.MqttRuleEngine; +import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType; import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -69,5 +70,19 @@ public class ReceiverMessageHandler implements MessageHandler { log.warn("未找到匹配的规则来处理命令类型: {}", val1); } } + + /* ===== 追加:根据报文内容识别格式并统一解析 ===== */ + int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict); + if (intType > 0) { + MqttRuleContext newCtx = new MqttRuleContext(); + newCtx.setCommandType((byte) intType); + newCtx.setDeviceImei(deviceImei); + newCtx.setPayloadDict(payloadDict); + + boolean ok = ruleEngine.executeRule(newCtx); + if (!ok) { + log.warn("新规则引擎未命中, imei={}", deviceImei); + } + } } } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java new file mode 100644 index 00000000..e12c0665 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/xinghan/XinghanDeviceDataRule.java @@ -0,0 +1,212 @@ +package com.fuyuanshen.global.mqtt.rule.xinghan; + +import com.alibaba.fastjson2.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fuyuanshen.common.core.constant.GlobalConstants; +import com.fuyuanshen.common.core.utils.StringUtils; +import com.fuyuanshen.common.json.utils.JsonUtils; +import com.fuyuanshen.common.redis.utils.RedisUtils; +import com.fuyuanshen.equipment.utils.map.GetAddressFromLatUtil; +import com.fuyuanshen.equipment.utils.map.LngLonUtil; +import com.fuyuanshen.global.mqtt.base.MqttMessageRule; +import com.fuyuanshen.global.mqtt.base.MqttRuleContext; +import com.fuyuanshen.global.mqtt.base.MqttXinghanJson; +import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; +import com.fuyuanshen.global.mqtt.constants.LightingCommandTypeConstants; +import com.fuyuanshen.global.mqtt.constants.XingHanCommandTypeConstants; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY; +import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.*; +import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX; + +/** + * 主动上报设备数据命令处理 + * 第一键值对,静电预警档位:3,2,1,0,分别表示高档/中档/低挡/关闭. + * 第二键值对,照明档位,2,1,0,分别表示弱光/强光/关闭 + * 第三键值对,SOS档位,2,1,0, 分别表示红蓝模式/爆闪模式/关闭 + * 第四键值对,剩余照明时间,0-5999,单位分钟。 + * 第五键值对, 剩余电量百分比,0-100。 + * 第六键值对, 近电预警级别, 0-无预警,1-弱预警,2-中预警,3-强预警,4-非常强预警。 + * 第七键值对, 静止报警状态,0-未静止报警,1-正在静止报警。 + * 第八键值对, 4G信号强度,0-32,数值越大,信号越强。 + * 第九键值对,IMIE卡号 + * 第十键值对,经度 + * 第十一键值对,纬度 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class XinghanDeviceDataRule implements MqttMessageRule { + + @Override + public String getCommandType() { + return XingHanCommandTypeConstants.XingHan_DEVICE_DATA; + } + + @Autowired + private ObjectMapper objectMapper; + + @Override + public void execute(MqttRuleContext context) { + try { + // Latitude, longitude + //主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间 + MqttXinghanJson deviceStatus = objectMapper.convertValue(context.getPayloadDict(), MqttXinghanJson.class); + + // 发送设备状态和位置信息到Redis + asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),deviceStatus); + } catch (Exception e) { + log.error("处理上报数据命令时出错", e); + } + } + + /** + * 发送设备状态信息和位置信息到Redis + * + * @param deviceImei 设备IMEI + * @param deviceStatus 机器主动上报的状态信息 + */ + public void asyncSendDeviceDataToRedisWithFuture(String deviceImei, MqttXinghanJson deviceStatus) { + CompletableFuture.runAsync(() -> { + try { + + // 将设备状态信息存储到Redis中 + String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + deviceImei + DEVICE_STATUS_KEY_PREFIX; + String deviceInfoJson = JsonUtils.toJsonString(deviceStatus); + // 存储到Redis + RedisUtils.setCacheObject(deviceRedisKey, deviceInfoJson); + + log.info("设备状态信息已异步发送到Redis: device={}, deviceInfoJson={}", + deviceImei, deviceInfoJson); + + //设备坐标缓存KEY + String functionAccess = FUNCTION_ACCESS_KEY + deviceImei; + // 异步发送经纬度到Redis + asyncSendLocationToRedisWithFuture(deviceImei, deviceStatus.getStalatitude(), deviceStatus.getStalongitude()); + + } catch (Exception e) { + log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e); + } + }); + } + + /** + * 异步发送位置信息到Redis(使用CompletableFuture) + * + * @param deviceImei 设备IMEI + * @param latitude 纬度 + * @param longitude 经度 + */ + public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) { + CompletableFuture.runAsync(() -> { + try { + if(StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)){ + return; + } + String[] latArr = latitude.split("\\."); + String[] lonArr = longitude.split("\\."); + // 将位置信息存储到Redis中 + String redisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DEVICE_LOCATION_KEY_PREFIX; + String redisObj = RedisUtils.getCacheObject(redisKey); + JSONObject jsonOBj = JSONObject.parseObject(redisObj); + if(jsonOBj != null){ + String str1 = latArr[0] +"."+ latArr[1].substring(0,4); + String str2 = lonArr[0] +"."+ lonArr[1].substring(0,4); + + String cacheLatitude = jsonOBj.getString("wgs84_latitude"); + String cacheLongitude = jsonOBj.getString("wgs84_longitude"); + String[] latArr1 = cacheLatitude.split("\\."); + String[] lonArr1 = cacheLongitude.split("\\."); + + String cacheStr1 = latArr1[0] +"."+ latArr1[1].substring(0,4); + String cacheStr2 = lonArr1[0] +"."+ lonArr1[1].substring(0,4); + if(str1.equals(cacheStr1) && str2.equals(cacheStr2)){ + log.info("位置信息未发生变化: device={}, lat={}, lon={}", deviceImei, latitude, longitude); + return; + } + } + + // 构造位置信息对象 + Map locationInfo = new LinkedHashMap<>(); + double[] doubles = LngLonUtil.gps84_To_Gcj02(Double.parseDouble(latitude), Double.parseDouble(longitude)); + locationInfo.put("deviceImei", deviceImei); + locationInfo.put("latitude", doubles[0]); + locationInfo.put("longitude", doubles[1]); + locationInfo.put("wgs84_latitude", latitude); + locationInfo.put("wgs84_longitude", longitude); + String address = GetAddressFromLatUtil.getAdd(String.valueOf(doubles[1]), String.valueOf(doubles[0])); + locationInfo.put("address", address); + locationInfo.put("timestamp", System.currentTimeMillis()); + + + + String locationJson = JsonUtils.toJsonString(locationInfo); + + // 存储到Redis + RedisUtils.setCacheObject(redisKey, locationJson); + + // 存储到一个列表中,保留历史位置信息 +// String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei; +// RedisUtils.addCacheList(locationHistoryKey, locationJson); +// RedisUtils.expire(locationHistoryKey, Duration.ofDays(90)); + storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson); + log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude); + } catch (Exception e) { + log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e); + } + }); + } + + /** + * 存储设备30天历史轨迹到Redis (使用Sorted Set) + */ + public void storeDeviceTrajectoryWithSortedSet(String deviceImei, String locationJson) { + try { + String trajectoryKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + deviceImei + DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX; +// String trajectoryKey = "device:trajectory:zset:" + deviceImei; +// String locationJson = JsonUtils.toJsonString(locationInfo); + long timestamp = System.currentTimeMillis(); + + // 添加到Sorted Set,使用时间戳作为score + RedisUtils.zAdd(trajectoryKey, locationJson, timestamp); + +// // 设置30天过期时间 +// RedisUtils.expire(trajectoryKey, Duration.ofDays(30)); + + // 清理30天前的数据(冗余保护) + long thirtyDaysAgo = System.currentTimeMillis() - (7L * 24 * 60 * 60 * 1000); + RedisUtils.zRemoveRangeByScore(trajectoryKey, 0, thirtyDaysAgo); + } catch (Exception e) { + log.error("存储设备轨迹到Redis(ZSet)失败: device={}, error={}", deviceImei, e.getMessage(), e); + } + } + + private Map buildLocationDataMap(String latitude, String longitude) { + String[] latArr = latitude.split("\\."); + String[] lonArr = longitude.split("\\."); + + ArrayList intData = new ArrayList<>(); + intData.add(11); + intData.add(Integer.parseInt(latArr[0])); + String str1 = latArr[1]; + intData.add(Integer.parseInt(str1.substring(0,4))); + String str2 = lonArr[1]; + intData.add(Integer.parseInt(lonArr[0])); + intData.add(Integer.parseInt(str2.substring(0,4))); + + Map map = new HashMap<>(); + map.put("instruct", intData); + return map; + } + +}