diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java index 5baddf63..3cd36af5 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/receiver/ReceiverMessageHandler.java @@ -17,6 +17,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Service; +import java.time.Duration; import java.util.Objects; @Service @@ -46,7 +47,7 @@ public class ReceiverMessageHandler implements MessageHandler { if(StringUtils.isNotBlank(deviceImei)){ //在线状态 String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX + deviceImei; - RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1"); + RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(60*15)); } String state = payloadDict.getStr("state"); diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/ActiveReportingDeviceDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/ActiveReportingDeviceDataRule.java index 6b37a476..104bd699 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/ActiveReportingDeviceDataRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/ActiveReportingDeviceDataRule.java @@ -94,4 +94,6 @@ public class ActiveReportingDeviceDataRule implements MqttMessageRule { }); } + + } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java index 518eaf56..8cf2ef55 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/rule/LocationDataRule.java @@ -153,15 +153,40 @@ public class LocationDataRule implements MqttMessageRule { RedisUtils.setCacheObject(redisKey, locationJson); // 存储到一个列表中,保留历史位置信息 - String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei; - RedisUtils.addCacheList(locationHistoryKey, locationJson); - RedisUtils.expire(locationHistoryKey, Duration.ofDays(90)); +// String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei; +// RedisUtils.addCacheList(locationHistoryKey, locationJson); +// RedisUtils.expire(locationHistoryKey, Duration.ofDays(90)); + storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson); log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude); } catch (Exception e) { log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e); } }); } + + /** + * 存储设备30天历史轨迹到Redis (使用Sorted Set) + */ + public void storeDeviceTrajectoryWithSortedSet(String deviceImei, String locationJson) { + try { + String trajectoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei; +// String trajectoryKey = "device:trajectory:zset:" + deviceImei; +// String locationJson = JsonUtils.toJsonString(locationInfo); + long timestamp = System.currentTimeMillis(); + + // 添加到Sorted Set,使用时间戳作为score + RedisUtils.zAdd(trajectoryKey, locationJson, timestamp); + +// // 设置30天过期时间 +// RedisUtils.expire(trajectoryKey, Duration.ofDays(30)); + + // 清理30天前的数据(冗余保护) + long thirtyDaysAgo = System.currentTimeMillis() - (90L * 24 * 60 * 60 * 1000); + RedisUtils.zRemoveRangeByScore(trajectoryKey, 0, thirtyDaysAgo); + } catch (Exception e) { + log.error("存储设备轨迹到Redis(ZSet)失败: device={}, error={}", deviceImei, e.getMessage(), e); + } + } private Map buildLocationDataMap(String latitude, String longitude) { String[] latArr = latitude.split("\\."); diff --git a/fys-common/fys-common-redis/src/main/java/com/fuyuanshen/common/redis/utils/RedisUtils.java b/fys-common/fys-common-redis/src/main/java/com/fuyuanshen/common/redis/utils/RedisUtils.java index bb7ba81a..17292c0b 100644 --- a/fys-common/fys-common-redis/src/main/java/com/fuyuanshen/common/redis/utils/RedisUtils.java +++ b/fys-common/fys-common-redis/src/main/java/com/fuyuanshen/common/redis/utils/RedisUtils.java @@ -339,7 +339,40 @@ public class RedisUtils { RSet rSet = CLIENT.getSet(key); return rSet.addAll(dataSet); } + /** + * 向Sorted Set添加元素 + * + * @param key 键 + * @param value 值 + * @param score 分数 + * @return 添加成功返回true,否则返回false + */ + public static boolean zAdd(String key, Object value, double score) { + try { + RScoredSortedSet sortedSet = CLIENT.getScoredSortedSet(key); + return sortedSet.add(score, value); + } catch (Exception e) { +// log.error("向Sorted Set添加元素失败: key={}, value={}, score={}, error={}", key, value, score, e.getMessage(), e); + return false; + } + } + /** + * 移除Sorted Set中指定范围的元素(按分数) + * + * @param key 键 + * @param min 最小分数 + * @param max 最大分数 + * @return 移除的元素数量 + */ + public static int zRemoveRangeByScore(String key, double min, double max) { + try { + RScoredSortedSet sortedSet = CLIENT.getScoredSortedSet(key); + return sortedSet.removeRangeByScore(min, true, max, true); + } catch (Exception e) { + return 0; + } + } /** * 追加缓存Set数据 *