forked from dyf/fys-Multi-tenant
设备历史轨迹
This commit is contained in:
@ -17,6 +17,7 @@ import org.springframework.messaging.MessageHeaders;
|
|||||||
import org.springframework.messaging.MessagingException;
|
import org.springframework.messaging.MessagingException;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -46,7 +47,7 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
if(StringUtils.isNotBlank(deviceImei)){
|
if(StringUtils.isNotBlank(deviceImei)){
|
||||||
//在线状态
|
//在线状态
|
||||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX + deviceImei;
|
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX + deviceImei;
|
||||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1");
|
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(60*15));
|
||||||
}
|
}
|
||||||
|
|
||||||
String state = payloadDict.getStr("state");
|
String state = payloadDict.getStr("state");
|
||||||
|
@ -94,4 +94,6 @@ public class ActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -153,15 +153,40 @@ public class LocationDataRule implements MqttMessageRule {
|
|||||||
RedisUtils.setCacheObject(redisKey, locationJson);
|
RedisUtils.setCacheObject(redisKey, locationJson);
|
||||||
|
|
||||||
// 存储到一个列表中,保留历史位置信息
|
// 存储到一个列表中,保留历史位置信息
|
||||||
String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
// String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||||
RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
// RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
||||||
RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
// RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
||||||
|
storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson);
|
||||||
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 存储设备30天历史轨迹到Redis (使用Sorted Set)
|
||||||
|
*/
|
||||||
|
public void storeDeviceTrajectoryWithSortedSet(String deviceImei, String locationJson) {
|
||||||
|
try {
|
||||||
|
String trajectoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||||
|
// String trajectoryKey = "device:trajectory:zset:" + deviceImei;
|
||||||
|
// String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||||
|
long timestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// 添加到Sorted Set,使用时间戳作为score
|
||||||
|
RedisUtils.zAdd(trajectoryKey, locationJson, timestamp);
|
||||||
|
|
||||||
|
// // 设置30天过期时间
|
||||||
|
// RedisUtils.expire(trajectoryKey, Duration.ofDays(30));
|
||||||
|
|
||||||
|
// 清理30天前的数据(冗余保护)
|
||||||
|
long thirtyDaysAgo = System.currentTimeMillis() - (90L * 24 * 60 * 60 * 1000);
|
||||||
|
RedisUtils.zRemoveRangeByScore(trajectoryKey, 0, thirtyDaysAgo);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("存储设备轨迹到Redis(ZSet)失败: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Object> buildLocationDataMap(String latitude, String longitude) {
|
private Map<String, Object> buildLocationDataMap(String latitude, String longitude) {
|
||||||
String[] latArr = latitude.split("\\.");
|
String[] latArr = latitude.split("\\.");
|
||||||
|
@ -339,7 +339,40 @@ public class RedisUtils {
|
|||||||
RSet<T> rSet = CLIENT.getSet(key);
|
RSet<T> rSet = CLIENT.getSet(key);
|
||||||
return rSet.addAll(dataSet);
|
return rSet.addAll(dataSet);
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* 向Sorted Set添加元素
|
||||||
|
*
|
||||||
|
* @param key 键
|
||||||
|
* @param value 值
|
||||||
|
* @param score 分数
|
||||||
|
* @return 添加成功返回true,否则返回false
|
||||||
|
*/
|
||||||
|
public static boolean zAdd(String key, Object value, double score) {
|
||||||
|
try {
|
||||||
|
RScoredSortedSet<Object> sortedSet = CLIENT.getScoredSortedSet(key);
|
||||||
|
return sortedSet.add(score, value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// log.error("向Sorted Set添加元素失败: key={}, value={}, score={}, error={}", key, value, score, e.getMessage(), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移除Sorted Set中指定范围的元素(按分数)
|
||||||
|
*
|
||||||
|
* @param key 键
|
||||||
|
* @param min 最小分数
|
||||||
|
* @param max 最大分数
|
||||||
|
* @return 移除的元素数量
|
||||||
|
*/
|
||||||
|
public static int zRemoveRangeByScore(String key, double min, double max) {
|
||||||
|
try {
|
||||||
|
RScoredSortedSet<Object> sortedSet = CLIENT.getScoredSortedSet(key);
|
||||||
|
return sortedSet.removeRangeByScore(min, true, max, true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* 追加缓存Set数据
|
* 追加缓存Set数据
|
||||||
*
|
*
|
||||||
|
Reference in New Issue
Block a user