forked from dyf/fys-Multi-tenant
设备mqtt收发数据
This commit is contained in:
@ -0,0 +1,143 @@
|
||||
package com.fuyuanshen.global.mqtt.rule;
|
||||
|
||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.equipment.utils.c.map.GetAddressFromLatUtil;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttMessageRule;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.config.MqttGateway;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceCommandTypeConstants;
|
||||
import com.fuyuanshen.global.mqtt.constants.MqttConstants;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* 定位数据命令处理
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class LocationDataRule implements MqttMessageRule {
|
||||
|
||||
private final MqttGateway mqttGateway;
|
||||
|
||||
|
||||
@Override
|
||||
public Integer getCommandType() {
|
||||
return DeviceCommandTypeConstants.LOCATION_DATA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(MqttRuleContext context) {
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
// Latitude, longitude
|
||||
String latitude = convertArr[1].toString();
|
||||
String longitude = convertArr[2].toString();
|
||||
|
||||
// 异步发送经纬度到Redis
|
||||
asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||
|
||||
Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
log.info("发送定位数据到设备=>topic:{},payload:{}",
|
||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
JsonUtils.toJsonString(map));
|
||||
} catch (Exception e) {
|
||||
log.error("处理定位数据命令时出错", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步发送位置信息到Redis
|
||||
*
|
||||
* @param deviceImei 设备IMEI
|
||||
* @param latitude 纬度
|
||||
* @param longitude 经度
|
||||
* @param timestamp 时间戳
|
||||
*/
|
||||
// @Async
|
||||
// public void asyncSendLocationToRedis(String deviceImei, String latitude, String longitude, long timestamp) {
|
||||
// try {
|
||||
// // 构造位置信息对象
|
||||
// Map<String, Object> locationInfo = new HashMap<>();
|
||||
// locationInfo.put("deviceImei", deviceImei);
|
||||
// locationInfo.put("latitude", latitude);
|
||||
// locationInfo.put("longitude", longitude);
|
||||
// locationInfo.put("timestamp", timestamp);
|
||||
//
|
||||
// // 将位置信息存储到Redis中,使用设备IMEI作为key的一部分
|
||||
// String redisKey = "device:location:" + deviceImei;
|
||||
// String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||
//
|
||||
// // 存储到Redis,设置过期时间(例如24小时)
|
||||
// RedisUtils.setCacheObject(redisKey, locationJson, Duration.ofSeconds(24 * 60 * 60));
|
||||
//
|
||||
// // 也可以存储到一个列表中,保留历史位置信息
|
||||
// String locationHistoryKey = "device:location:history:" + deviceImei;
|
||||
// RedisUtils.lPush(locationHistoryKey, locationJson, 24 * 60 * 60);
|
||||
//
|
||||
// log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
// } catch (Exception e) {
|
||||
// log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
// }
|
||||
// }
|
||||
|
||||
/**
|
||||
* 异步发送位置信息到Redis(使用CompletableFuture)
|
||||
*
|
||||
* @param deviceImei 设备IMEI
|
||||
* @param latitude 纬度
|
||||
* @param longitude 经度
|
||||
*/
|
||||
public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
// 构造位置信息对象
|
||||
Map<String, Object> locationInfo = new LinkedHashMap<>();
|
||||
locationInfo.put("deviceImei", deviceImei);
|
||||
locationInfo.put("latitude", latitude);
|
||||
locationInfo.put("longitude", longitude);
|
||||
String address = GetAddressFromLatUtil.getAdd(longitude, latitude);
|
||||
locationInfo.put("address", address);
|
||||
locationInfo.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
|
||||
// 将位置信息存储到Redis中
|
||||
String redisKey = "device:location:" + deviceImei;
|
||||
String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(redisKey, locationJson, Duration.ofSeconds(24 * 60 * 60));
|
||||
|
||||
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Map<String, Object> buildLocationDataMap(String latitude, String longitude) {
|
||||
String[] latArr = latitude.split("\\.");
|
||||
String[] lonArr = longitude.split("\\.");
|
||||
|
||||
ArrayList<Integer> intData = new ArrayList<>();
|
||||
intData.add(DeviceCommandTypeConstants.LOCATION_DATA);
|
||||
intData.add(Integer.parseInt(latArr[0]));
|
||||
intData.add(Integer.parseInt(latArr[1]));
|
||||
intData.add(Integer.parseInt(lonArr[0]));
|
||||
intData.add(Integer.parseInt(lonArr[1]));
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("instruct", intData);
|
||||
return map;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
package com.fuyuanshen.global.mqtt.rule;
|
||||
|
||||
import com.fuyuanshen.common.core.utils.ImageToCArrayConverter;
|
||||
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttMessageRule;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.config.MqttGateway;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceCommandTypeConstants;
|
||||
import com.fuyuanshen.global.mqtt.constants.MqttConstants;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.fuyuanshen.common.core.utils.ImageToCArrayConverter.convertHexToDecimal;
|
||||
|
||||
/**
|
||||
* 人员信息命令处理
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class PersonnelInfoRule implements MqttMessageRule {
|
||||
|
||||
private final MqttGateway mqttGateway;
|
||||
|
||||
@Override
|
||||
public Integer getCommandType() {
|
||||
return DeviceCommandTypeConstants.PERSONNEL_INFO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(MqttRuleContext context) {
|
||||
try {
|
||||
Byte val2 = (Byte) context.getConvertArr()[1];
|
||||
if (val2 == 100) {
|
||||
return;
|
||||
}
|
||||
|
||||
String data = RedisUtils.getCacheObject("894078:app_logo_data:" + context.getDeviceImei());
|
||||
if (StringUtils.isEmpty(data)) {
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||
System.out.println("第" + val2 + "块数据大小: " + specificChunk.length + " 字节");
|
||||
System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
||||
|
||||
ArrayList<Integer> intData = new ArrayList<>();
|
||||
intData.add(3);
|
||||
intData.add((int) val2);
|
||||
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("instruct", intData);
|
||||
|
||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
log.info("发送人员信息点阵数据到设备消息=>topic:{},payload:{}",
|
||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
JsonUtils.toJsonString(map));
|
||||
} catch (Exception e) {
|
||||
log.error("处理人员信息命令时出错", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user