Compare commits
5 Commits
f4d5f950ce
...
6413888a1c
| Author | SHA1 | Date | |
|---|---|---|---|
| 6413888a1c | |||
| a74b697c0f | |||
| ce8f6c9a77 | |||
| c8dff1a082 | |||
| 0ad100a7a5 |
@ -336,8 +336,8 @@ public class AppDeviceBizService {
|
||||
String locationInfo = RedisUtils.getCacheObject(locationKey);
|
||||
if(StringUtils.isNotBlank(locationInfo)){
|
||||
JSONObject jsonObject = JSONObject.parseObject(locationInfo);
|
||||
vo.setLongitude((String)jsonObject.get("longitude"));
|
||||
vo.setLatitude((String)jsonObject.get("latitude"));
|
||||
vo.setLongitude(jsonObject.get("longitude").toString());
|
||||
vo.setLatitude(jsonObject.get("latitude").toString());
|
||||
vo.setAddress((String)jsonObject.get("address"));
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
@Service
|
||||
@ -46,7 +47,7 @@ public class ReceiverMessageHandler implements MessageHandler {
|
||||
if(StringUtils.isNotBlank(deviceImei)){
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX + deviceImei;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1");
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(60*15));
|
||||
}
|
||||
|
||||
String state = payloadDict.getStr("state");
|
||||
|
||||
@ -94,4 +94,6 @@ public class ActiveReportingDeviceDataRule implements MqttMessageRule {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
package com.fuyuanshen.global.mqtt.rule;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||
@ -108,28 +110,53 @@ public class LocationDataRule implements MqttMessageRule {
|
||||
if(StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)){
|
||||
return;
|
||||
}
|
||||
String[] latArr = latitude.split("\\.");
|
||||
String[] lonArr = longitude.split("\\.");
|
||||
// 将位置信息存储到Redis中
|
||||
String redisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_KEY_PREFIX + deviceImei;
|
||||
String redisObj = RedisUtils.getCacheObject(redisKey);
|
||||
JSONObject jsonOBj = JSONObject.parseObject(redisObj);
|
||||
if(jsonOBj != null){
|
||||
String str1 = latArr[0] +"."+ latArr[1].substring(0,4);
|
||||
String str2 = lonArr[0] +"."+ lonArr[1].substring(0,4);
|
||||
|
||||
String cacheLatitude = jsonOBj.getString("wgs84_latitude");
|
||||
String cacheLongitude = jsonOBj.getString("wgs84_longitude");
|
||||
String[] latArr1 = cacheLatitude.split("\\.");
|
||||
String[] lonArr1 = cacheLongitude.split("\\.");
|
||||
|
||||
String cacheStr1 = latArr1[0] +"."+ latArr1[1].substring(0,4);
|
||||
String cacheStr2 = lonArr1[0] +"."+ lonArr1[1].substring(0,4);
|
||||
if(str1.equals(cacheStr1) && str2.equals(cacheStr2)){
|
||||
log.info("位置信息未发生变化: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 构造位置信息对象
|
||||
Map<String, Object> locationInfo = new LinkedHashMap<>();
|
||||
double[] doubles = LngLonUtil.gps84_To_Gcj02(Double.parseDouble(latitude), Double.parseDouble(longitude));
|
||||
locationInfo.put("deviceImei", deviceImei);
|
||||
locationInfo.put("latitude", doubles[0]);
|
||||
locationInfo.put("longitude", doubles[1]);
|
||||
locationInfo.put("wgs84_latitude", latitude);
|
||||
locationInfo.put("wgs84_longitude", longitude);
|
||||
String address = GetAddressFromLatUtil.getAdd(String.valueOf(doubles[1]), String.valueOf(doubles[0]));
|
||||
locationInfo.put("address", address);
|
||||
locationInfo.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
|
||||
// 将位置信息存储到Redis中
|
||||
String redisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_KEY_PREFIX + deviceImei;
|
||||
|
||||
String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(redisKey, locationJson);
|
||||
|
||||
// 存储到一个列表中,保留历史位置信息
|
||||
String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||
RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
||||
RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
||||
// String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||
// RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
||||
// RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
||||
storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson);
|
||||
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
@ -137,6 +164,30 @@ public class LocationDataRule implements MqttMessageRule {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储设备30天历史轨迹到Redis (使用Sorted Set)
|
||||
*/
|
||||
public void storeDeviceTrajectoryWithSortedSet(String deviceImei, String locationJson) {
|
||||
try {
|
||||
String trajectoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||
// String trajectoryKey = "device:trajectory:zset:" + deviceImei;
|
||||
// String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
||||
// 添加到Sorted Set,使用时间戳作为score
|
||||
RedisUtils.zAdd(trajectoryKey, locationJson, timestamp);
|
||||
|
||||
// // 设置30天过期时间
|
||||
// RedisUtils.expire(trajectoryKey, Duration.ofDays(30));
|
||||
|
||||
// 清理30天前的数据(冗余保护)
|
||||
long thirtyDaysAgo = System.currentTimeMillis() - (90L * 24 * 60 * 60 * 1000);
|
||||
RedisUtils.zRemoveRangeByScore(trajectoryKey, 0, thirtyDaysAgo);
|
||||
} catch (Exception e) {
|
||||
log.error("存储设备轨迹到Redis(ZSet)失败: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Object> buildLocationDataMap(String latitude, String longitude) {
|
||||
String[] latArr = latitude.split("\\.");
|
||||
String[] lonArr = longitude.split("\\.");
|
||||
|
||||
@ -339,7 +339,40 @@ public class RedisUtils {
|
||||
RSet<T> rSet = CLIENT.getSet(key);
|
||||
return rSet.addAll(dataSet);
|
||||
}
|
||||
/**
|
||||
* 向Sorted Set添加元素
|
||||
*
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param score 分数
|
||||
* @return 添加成功返回true,否则返回false
|
||||
*/
|
||||
public static boolean zAdd(String key, Object value, double score) {
|
||||
try {
|
||||
RScoredSortedSet<Object> sortedSet = CLIENT.getScoredSortedSet(key);
|
||||
return sortedSet.add(score, value);
|
||||
} catch (Exception e) {
|
||||
// log.error("向Sorted Set添加元素失败: key={}, value={}, score={}, error={}", key, value, score, e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除Sorted Set中指定范围的元素(按分数)
|
||||
*
|
||||
* @param key 键
|
||||
* @param min 最小分数
|
||||
* @param max 最大分数
|
||||
* @return 移除的元素数量
|
||||
*/
|
||||
public static int zRemoveRangeByScore(String key, double min, double max) {
|
||||
try {
|
||||
RScoredSortedSet<Object> sortedSet = CLIENT.getScoredSortedSet(key);
|
||||
return sortedSet.removeRangeByScore(min, true, max, true);
|
||||
} catch (Exception e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 追加缓存Set数据
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user