Compare commits
5 Commits
f4d5f950ce
...
6413888a1c
| Author | SHA1 | Date | |
|---|---|---|---|
| 6413888a1c | |||
| a74b697c0f | |||
| ce8f6c9a77 | |||
| c8dff1a082 | |||
| 0ad100a7a5 |
@ -336,8 +336,8 @@ public class AppDeviceBizService {
|
|||||||
String locationInfo = RedisUtils.getCacheObject(locationKey);
|
String locationInfo = RedisUtils.getCacheObject(locationKey);
|
||||||
if(StringUtils.isNotBlank(locationInfo)){
|
if(StringUtils.isNotBlank(locationInfo)){
|
||||||
JSONObject jsonObject = JSONObject.parseObject(locationInfo);
|
JSONObject jsonObject = JSONObject.parseObject(locationInfo);
|
||||||
vo.setLongitude((String)jsonObject.get("longitude"));
|
vo.setLongitude(jsonObject.get("longitude").toString());
|
||||||
vo.setLatitude((String)jsonObject.get("latitude"));
|
vo.setLatitude(jsonObject.get("latitude").toString());
|
||||||
vo.setAddress((String)jsonObject.get("address"));
|
vo.setAddress((String)jsonObject.get("address"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@ import org.springframework.messaging.MessageHeaders;
|
|||||||
import org.springframework.messaging.MessagingException;
|
import org.springframework.messaging.MessagingException;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -46,7 +47,7 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
if(StringUtils.isNotBlank(deviceImei)){
|
if(StringUtils.isNotBlank(deviceImei)){
|
||||||
//在线状态
|
//在线状态
|
||||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX + deviceImei;
|
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX + deviceImei;
|
||||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1");
|
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(60*15));
|
||||||
}
|
}
|
||||||
|
|
||||||
String state = payloadDict.getStr("state");
|
String state = payloadDict.getStr("state");
|
||||||
|
|||||||
@ -94,4 +94,6 @@ public class ActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
package com.fuyuanshen.global.mqtt.rule;
|
package com.fuyuanshen.global.mqtt.rule;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||||
import com.fuyuanshen.common.core.utils.StringUtils;
|
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||||
@ -108,34 +110,83 @@ public class LocationDataRule implements MqttMessageRule {
|
|||||||
if(StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)){
|
if(StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
String[] latArr = latitude.split("\\.");
|
||||||
|
String[] lonArr = longitude.split("\\.");
|
||||||
|
// 将位置信息存储到Redis中
|
||||||
|
String redisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_KEY_PREFIX + deviceImei;
|
||||||
|
String redisObj = RedisUtils.getCacheObject(redisKey);
|
||||||
|
JSONObject jsonOBj = JSONObject.parseObject(redisObj);
|
||||||
|
if(jsonOBj != null){
|
||||||
|
String str1 = latArr[0] +"."+ latArr[1].substring(0,4);
|
||||||
|
String str2 = lonArr[0] +"."+ lonArr[1].substring(0,4);
|
||||||
|
|
||||||
|
String cacheLatitude = jsonOBj.getString("wgs84_latitude");
|
||||||
|
String cacheLongitude = jsonOBj.getString("wgs84_longitude");
|
||||||
|
String[] latArr1 = cacheLatitude.split("\\.");
|
||||||
|
String[] lonArr1 = cacheLongitude.split("\\.");
|
||||||
|
|
||||||
|
String cacheStr1 = latArr1[0] +"."+ latArr1[1].substring(0,4);
|
||||||
|
String cacheStr2 = lonArr1[0] +"."+ lonArr1[1].substring(0,4);
|
||||||
|
if(str1.equals(cacheStr1) && str2.equals(cacheStr2)){
|
||||||
|
log.info("位置信息未发生变化: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 构造位置信息对象
|
// 构造位置信息对象
|
||||||
Map<String, Object> locationInfo = new LinkedHashMap<>();
|
Map<String, Object> locationInfo = new LinkedHashMap<>();
|
||||||
double[] doubles = LngLonUtil.gps84_To_Gcj02(Double.parseDouble(latitude), Double.parseDouble(longitude));
|
double[] doubles = LngLonUtil.gps84_To_Gcj02(Double.parseDouble(latitude), Double.parseDouble(longitude));
|
||||||
locationInfo.put("deviceImei", deviceImei);
|
locationInfo.put("deviceImei", deviceImei);
|
||||||
locationInfo.put("latitude", doubles[0]);
|
locationInfo.put("latitude", doubles[0]);
|
||||||
locationInfo.put("longitude", doubles[1]);
|
locationInfo.put("longitude", doubles[1]);
|
||||||
|
locationInfo.put("wgs84_latitude", latitude);
|
||||||
|
locationInfo.put("wgs84_longitude", longitude);
|
||||||
String address = GetAddressFromLatUtil.getAdd(String.valueOf(doubles[1]), String.valueOf(doubles[0]));
|
String address = GetAddressFromLatUtil.getAdd(String.valueOf(doubles[1]), String.valueOf(doubles[0]));
|
||||||
locationInfo.put("address", address);
|
locationInfo.put("address", address);
|
||||||
locationInfo.put("timestamp", System.currentTimeMillis());
|
locationInfo.put("timestamp", System.currentTimeMillis());
|
||||||
|
|
||||||
|
|
||||||
// 将位置信息存储到Redis中
|
|
||||||
String redisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_KEY_PREFIX + deviceImei;
|
|
||||||
String locationJson = JsonUtils.toJsonString(locationInfo);
|
String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||||
|
|
||||||
// 存储到Redis
|
// 存储到Redis
|
||||||
RedisUtils.setCacheObject(redisKey, locationJson);
|
RedisUtils.setCacheObject(redisKey, locationJson);
|
||||||
|
|
||||||
// 存储到一个列表中,保留历史位置信息
|
// 存储到一个列表中,保留历史位置信息
|
||||||
String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
// String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||||
RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
// RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
||||||
RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
// RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
||||||
|
storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson);
|
||||||
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 存储设备30天历史轨迹到Redis (使用Sorted Set)
|
||||||
|
*/
|
||||||
|
public void storeDeviceTrajectoryWithSortedSet(String deviceImei, String locationJson) {
|
||||||
|
try {
|
||||||
|
String trajectoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||||
|
// String trajectoryKey = "device:trajectory:zset:" + deviceImei;
|
||||||
|
// String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||||
|
long timestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// 添加到Sorted Set,使用时间戳作为score
|
||||||
|
RedisUtils.zAdd(trajectoryKey, locationJson, timestamp);
|
||||||
|
|
||||||
|
// // 设置30天过期时间
|
||||||
|
// RedisUtils.expire(trajectoryKey, Duration.ofDays(30));
|
||||||
|
|
||||||
|
// 清理30天前的数据(冗余保护)
|
||||||
|
long thirtyDaysAgo = System.currentTimeMillis() - (90L * 24 * 60 * 60 * 1000);
|
||||||
|
RedisUtils.zRemoveRangeByScore(trajectoryKey, 0, thirtyDaysAgo);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("存储设备轨迹到Redis(ZSet)失败: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Object> buildLocationDataMap(String latitude, String longitude) {
|
private Map<String, Object> buildLocationDataMap(String latitude, String longitude) {
|
||||||
String[] latArr = latitude.split("\\.");
|
String[] latArr = latitude.split("\\.");
|
||||||
|
|||||||
@ -339,7 +339,40 @@ public class RedisUtils {
|
|||||||
RSet<T> rSet = CLIENT.getSet(key);
|
RSet<T> rSet = CLIENT.getSet(key);
|
||||||
return rSet.addAll(dataSet);
|
return rSet.addAll(dataSet);
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* 向Sorted Set添加元素
|
||||||
|
*
|
||||||
|
* @param key 键
|
||||||
|
* @param value 值
|
||||||
|
* @param score 分数
|
||||||
|
* @return 添加成功返回true,否则返回false
|
||||||
|
*/
|
||||||
|
public static boolean zAdd(String key, Object value, double score) {
|
||||||
|
try {
|
||||||
|
RScoredSortedSet<Object> sortedSet = CLIENT.getScoredSortedSet(key);
|
||||||
|
return sortedSet.add(score, value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// log.error("向Sorted Set添加元素失败: key={}, value={}, score={}, error={}", key, value, score, e.getMessage(), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移除Sorted Set中指定范围的元素(按分数)
|
||||||
|
*
|
||||||
|
* @param key 键
|
||||||
|
* @param min 最小分数
|
||||||
|
* @param max 最大分数
|
||||||
|
* @return 移除的元素数量
|
||||||
|
*/
|
||||||
|
public static int zRemoveRangeByScore(String key, double min, double max) {
|
||||||
|
try {
|
||||||
|
RScoredSortedSet<Object> sortedSet = CLIENT.getScoredSortedSet(key);
|
||||||
|
return sortedSet.removeRangeByScore(min, true, max, true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* 追加缓存Set数据
|
* 追加缓存Set数据
|
||||||
*
|
*
|
||||||
|
|||||||
Reference in New Issue
Block a user