forked from dyf/fys-Multi-tenant
6075 mqtt
This commit is contained in:
@ -0,0 +1,277 @@
|
||||
package com.fuyuanshen.global.mqtt.receiver;
|
||||
|
||||
import cn.hutool.core.lang.Dict;
|
||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||
import com.fuyuanshen.common.core.utils.ImageToCArrayConverter;
|
||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||
|
||||
/**
|
||||
* @author: 默苍璃
|
||||
* @date: 2025-11-05 17:41
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class DeviceMessageHandler implements MessageHandler {
|
||||
|
||||
@Autowired
|
||||
private MqttRuleEngine ruleEngine;
|
||||
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
Object payload = message.getPayload();
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
|
||||
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
|
||||
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
|
||||
|
||||
log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
|
||||
payload, receivedTopic, receivedQos, timestamp);
|
||||
|
||||
Dict payloadDict = JsonUtils.parseMap(payload.toString());
|
||||
if (receivedTopic == null || payloadDict == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析设备IMEI
|
||||
String[] subStr = receivedTopic.split("/");
|
||||
String deviceImei = subStr[1];
|
||||
|
||||
// 处理设备在线状态
|
||||
handleDeviceOnlineStatus(deviceImei);
|
||||
|
||||
// 处理不同类型的设备信息
|
||||
processDeviceInformation(payloadDict, deviceImei);
|
||||
|
||||
// 执行规则引擎处理
|
||||
executeRuleEngine(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 处理设备在线状态
|
||||
*/
|
||||
private void handleDeviceOnlineStatus(String deviceImei) {
|
||||
if (StringUtils.isNotBlank(deviceImei)) {
|
||||
// 添加去重队列
|
||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
RedisUtils.offerDeduplicated(queueKey, dedupKey, deviceImei, Duration.ofSeconds(900));
|
||||
|
||||
// 设置设备在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY +
|
||||
DEVICE_KEY_PREFIX + deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 处理不同类型的设备信息
|
||||
*/
|
||||
private void processDeviceInformation(Dict payloadDict, String deviceImei) {
|
||||
// 开机画面
|
||||
if (payloadDict.containsKey("bootScreen")) {
|
||||
handleBootScreen(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 人员信息
|
||||
if (payloadDict.containsKey("personInfo")) {
|
||||
handlePersonInfo(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 设备信息
|
||||
if (payloadDict.containsKey("deviceInfo")) {
|
||||
handleDeviceInfo(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 经纬度
|
||||
if (payloadDict.containsKey("latitude") && payloadDict.containsKey("longitude")) {
|
||||
handleLocation(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 电子地图
|
||||
if (payloadDict.containsKey("mapData")) {
|
||||
handleMapData(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 电池电量
|
||||
if (payloadDict.containsKey("batteryLevel")) {
|
||||
handleBatteryLevel(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 开启/关闭状态
|
||||
if (payloadDict.containsKey("powerState")) {
|
||||
handlePowerState(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 海拔高度
|
||||
if (payloadDict.containsKey("altitude")) {
|
||||
handleAltitude(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 相对高度
|
||||
if (payloadDict.containsKey("relativeHeight")) {
|
||||
handleRelativeHeight(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 群呼/单呼
|
||||
if (payloadDict.containsKey("callType")) {
|
||||
handleCallType(payloadDict, deviceImei);
|
||||
}
|
||||
|
||||
// 文字信息
|
||||
if (payloadDict.containsKey("textMessage")) {
|
||||
handleTextMessage(payloadDict, deviceImei);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理开机画面
|
||||
*/
|
||||
private void handleBootScreen(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的开机画面信息", deviceImei);
|
||||
// 实现具体的开机画面处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理人员信息
|
||||
*/
|
||||
private void handlePersonInfo(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的人员信息", deviceImei);
|
||||
// 实现具体的人员信息处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备信息
|
||||
*/
|
||||
private void handleDeviceInfo(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的设备信息", deviceImei);
|
||||
// 实现具体的设备信息处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理位置信息
|
||||
*/
|
||||
private void handleLocation(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的位置信息: 纬度={}, 经度={}",
|
||||
deviceImei, payloadDict.getStr("latitude"), payloadDict.getStr("longitude"));
|
||||
// 实现具体的位置信息处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理电子地图数据
|
||||
*/
|
||||
private void handleMapData(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的电子地图数据", deviceImei);
|
||||
// 实现具体的电子地图数据处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理电池电量
|
||||
*/
|
||||
private void handleBatteryLevel(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的电池电量: {}", deviceImei, payloadDict.getStr("batteryLevel"));
|
||||
// 实现具体的电池电量处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理开关状态
|
||||
*/
|
||||
private void handlePowerState(Dict payloadDict, String deviceImei) {
|
||||
String powerState = payloadDict.getStr("powerState");
|
||||
log.info("处理设备{}的开关状态: {}", deviceImei, powerState);
|
||||
// 实现具体的开关状态处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理海拔高度
|
||||
*/
|
||||
private void handleAltitude(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的海拔高度: {}", deviceImei, payloadDict.getStr("altitude"));
|
||||
// 实现具体的海拔高度处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理相对高度
|
||||
*/
|
||||
private void handleRelativeHeight(Dict payloadDict, String deviceImei) {
|
||||
log.info("处理设备{}的相对高度: {}", deviceImei, payloadDict.getStr("relativeHeight"));
|
||||
// 实现具体的相对高度处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理呼叫类型
|
||||
*/
|
||||
private void handleCallType(Dict payloadDict, String deviceImei) {
|
||||
String callType = payloadDict.getStr("callType");
|
||||
log.info("处理设备{}的呼叫类型: {}", deviceImei, callType);
|
||||
// 实现具体的呼叫类型处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理文字信息
|
||||
*/
|
||||
private void handleTextMessage(Dict payloadDict, String deviceImei) {
|
||||
String textMessage = payloadDict.getStr("textMessage");
|
||||
log.info("处理设备{}的文字信息: {}", deviceImei, textMessage);
|
||||
// 实现具体的文字信息处理逻辑
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行规则引擎处理
|
||||
*/
|
||||
private void executeRuleEngine(Dict payloadDict, String deviceImei) {
|
||||
String state = payloadDict.getStr("state");
|
||||
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
||||
|
||||
if (convertArr.length > 0) {
|
||||
Byte val1 = (Byte) convertArr[0];
|
||||
MqttRuleContext context = new MqttRuleContext();
|
||||
context.setCommandType(val1);
|
||||
context.setConvertArr(convertArr);
|
||||
context.setDeviceImei(deviceImei);
|
||||
context.setPayloadDict(payloadDict);
|
||||
|
||||
boolean ruleExecuted = ruleEngine.executeRule(context);
|
||||
|
||||
if (!ruleExecuted) {
|
||||
log.warn("未找到匹配的规则来处理命令类型: {}", val1);
|
||||
}
|
||||
}
|
||||
|
||||
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
|
||||
int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
|
||||
if (intType > 0) {
|
||||
MqttRuleContext newCtx = new MqttRuleContext();
|
||||
newCtx.setCommandType((byte) intType);
|
||||
newCtx.setDeviceImei(deviceImei);
|
||||
newCtx.setPayloadDict(payloadDict);
|
||||
|
||||
boolean ok = ruleEngine.executeRule(newCtx);
|
||||
if (!ok) {
|
||||
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -90,4 +90,5 @@ public class ReceiverMessageHandler implements MessageHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user