提交代码3

This commit is contained in:
2026-03-02 13:45:30 +08:00
parent 34841c8704
commit c9cad751f0
4 changed files with 237 additions and 236 deletions

View File

@ -1,68 +1,68 @@
package com.fuyuanshen.global.mqtt.config;
import cn.hutool.core.lang.UUID;
import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@Slf4j
public class NewMqttInboundConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
@Autowired
private NewReceiverMessageHandler receiverMessageHandler2;
//消息通道
@Bean
public MessageChannel messageInboundChannel2(){
return new DirectChannel();
}
/**
* 配置入站适配器
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
* */
@Bean
public MessageProducer messageProducer2(){
// 生成一个不重复的随机数
String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID();
String subTopic = mqttPropertiesConfig.getSubTopic2();
log.info("订阅主题:{}", subTopic);
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
mqttPropertiesConfig.getUrl(),
clientId,
mqttPahoClientFactory,
subTopic.split(",")
);
mqttPahoMessageDrivenChannelAdapter.setQos(1);
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2());
return mqttPahoMessageDrivenChannelAdapter;
}
/** 指定处理消息来自哪个通道 */
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel2")
public MessageHandler messageHandler2(){
return receiverMessageHandler2;
}
// @Bean
// @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确
// public MessageHandler deviceAlarmMessageHandler() {
// return new DeviceAlrmMessageHandler();
// }
}
//package com.fuyuanshen.global.mqtt.config;
//
//
//import cn.hutool.core.lang.UUID;
//import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.core.MessageProducer;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.MessageHandler;
//
//
//@Configuration
//@Slf4j
//public class NewMqttInboundConfiguration {
// @Autowired
// private MqttPropertiesConfig mqttPropertiesConfig;
// @Autowired
// private MqttPahoClientFactory mqttPahoClientFactory;
// @Autowired
// private NewReceiverMessageHandler receiverMessageHandler2;
// //消息通道
// @Bean
// public MessageChannel messageInboundChannel2(){
// return new DirectChannel();
// }
//
// /**
// * 配置入站适配器
// * 作用: 设置订阅主题,以及指定消息的通道 等相关属性
// * */
// @Bean
// public MessageProducer messageProducer2(){
// // 生成一个不重复的随机数
// String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID();
// String subTopic = mqttPropertiesConfig.getSubTopic2();
// log.info("订阅主题:{}", subTopic);
// MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
// mqttPropertiesConfig.getUrl(),
// clientId,
// mqttPahoClientFactory,
// subTopic.split(",")
// );
// mqttPahoMessageDrivenChannelAdapter.setQos(1);
// mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
// mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2());
// return mqttPahoMessageDrivenChannelAdapter;
// }
// /** 指定处理消息来自哪个通道 */
// @Bean
// @ServiceActivator(inputChannel = "messageInboundChannel2")
// public MessageHandler messageHandler2(){
// return receiverMessageHandler2;
// }
//
// // @Bean
// // @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确
// // public MessageHandler deviceAlarmMessageHandler() {
// // return new DeviceAlrmMessageHandler();
// // }
//}

View File

@ -1,46 +1,46 @@
package com.fuyuanshen.global.mqtt.config;
import cn.hutool.core.lang.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@Slf4j
public class NewMqttOutboundConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
// 消息通道
@Bean
public MessageChannel mqttOutboundChannel2(){
return new DirectChannel();
}
/** 配置出站消息处理器 */
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理
public MessageHandler mqttOutboundMessageHandler2(){
String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID();
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
mqttPropertiesConfig.getUrl(),
clientId,
mqttPahoClientFactory
);
mqttPahoMessageHandler.setDefaultQos(1);
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2());
mqttPahoMessageHandler.setAsync(true);
return mqttPahoMessageHandler;
}
}
//package com.fuyuanshen.global.mqtt.config;
//
//import cn.hutool.core.lang.UUID;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.MessageHandler;
//
//@Configuration
//@Slf4j
//public class NewMqttOutboundConfiguration {
// @Autowired
// private MqttPropertiesConfig mqttPropertiesConfig;
// @Autowired
// private MqttPahoClientFactory mqttPahoClientFactory;
//
// // 消息通道
// @Bean
// public MessageChannel mqttOutboundChannel2(){
// return new DirectChannel();
// }
//
//
// /** 配置出站消息处理器 */
// @Bean
// @ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理
// public MessageHandler mqttOutboundMessageHandler2(){
// String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID();
// MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
// mqttPropertiesConfig.getUrl(),
// clientId,
// mqttPahoClientFactory
// );
// mqttPahoMessageHandler.setDefaultQos(1);
// mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2());
// mqttPahoMessageHandler.setAsync(true);
// return mqttPahoMessageHandler;
// }
//
//}

View File

@ -2,7 +2,8 @@ package com.fuyuanshen.global.mqtt.config;
import cn.hutool.core.lang.UUID;
import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
//import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
import com.fuyuanshen.global.mqtt.receiver.RegisEquipReceiverMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -25,7 +26,7 @@ public class RegisEquipMqttInboundConfiguration {
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
@Autowired
private NewReceiverMessageHandler receiverMessageHandler3;
private RegisEquipReceiverMessageHandler receiverMessageHandler3;
//消息通道
@Bean
public MessageChannel messageInboundChannel3(){

View File

@ -1,125 +1,125 @@
package com.fuyuanshen.global.mqtt.receiver;
import cn.hutool.core.lang.Dict;
import com.baomidou.lock.LockTemplate;
import com.fuyuanshen.common.core.constant.GlobalConstants;
import com.fuyuanshen.common.core.utils.StringUtils;
import com.fuyuanshen.common.json.utils.JsonUtils;
import com.fuyuanshen.common.redis.utils.RedisUtils;
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine;
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
@Service
@Slf4j
public class NewReceiverMessageHandler implements MessageHandler {
@Autowired
private NewMqttRuleEngine newRuleEngine;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
payload, receivedTopic, receivedQos, timestamp);
Dict payloadDict = JsonUtils.parseMap(payload.toString());
if (receivedTopic == null || payloadDict == null) {
return;
}
String imei = payloadDict.getStr("imei");
String funcType = payloadDict.getStr("funcType");
// 执行业务逻辑
if(StringUtils.isNotBlank(imei)){
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
//在线状态
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
}
String[] topicArr = receivedTopic.split("/");
NewMqttRuleContext context = new NewMqttRuleContext();
context.setCommandType(topicArr[2]+"_"+funcType);
context.setDeviceImei(imei);
context.setPayloadDict(payloadDict);
boolean ruleExecuted = newRuleEngine.executeRule(context);
if (!ruleExecuted) {
log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
}
// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class);
// if (null == lockInfo) {
// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei);
//package com.fuyuanshen.global.mqtt.receiver;
//
//import cn.hutool.core.lang.Dict;
//import com.baomidou.lock.LockTemplate;
//import com.fuyuanshen.common.core.constant.GlobalConstants;
//import com.fuyuanshen.common.core.utils.StringUtils;
//import com.fuyuanshen.common.json.utils.JsonUtils;
//import com.fuyuanshen.common.redis.utils.RedisUtils;
//import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
//import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine;
//import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
//import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
//import lombok.extern.slf4j.Slf4j;
//import org.redisson.api.RLock;
//import org.redisson.api.RedissonClient;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.messaging.Message;
//import org.springframework.messaging.MessageHandler;
//import org.springframework.messaging.MessageHeaders;
//import org.springframework.messaging.MessagingException;
//import org.springframework.stereotype.Service;
//
//import java.time.Duration;
//import java.util.Objects;
//import java.util.concurrent.TimeUnit;
//
//import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
//
//@Service
//@Slf4j
//public class NewReceiverMessageHandler implements MessageHandler {
//
// @Autowired
// private NewMqttRuleEngine newRuleEngine;
//
// @Override
// public void handleMessage(Message<?> message) throws MessagingException {
//
// Object payload = message.getPayload();
// MessageHeaders headers = message.getHeaders();
// String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
// String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
// String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
//
// log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
// payload, receivedTopic, receivedQos, timestamp);
//
// Dict payloadDict = JsonUtils.parseMap(payload.toString());
// if (receivedTopic == null || payloadDict == null) {
// return;
// }
//// 获取锁成功,处理业务
// try {
// if(StringUtils.isNotBlank(imei)){
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
// //在线状态
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
// }
//
// String[] topicArr = receivedTopic.split("/");
//
// NewMqttRuleContext context = new NewMqttRuleContext();
// context.setCommandType(topicArr[2]+"_"+funcType);
// context.setDeviceImei(imei);
// context.setPayloadDict(payloadDict);
//
// boolean ruleExecuted = newRuleEngine.executeRule(context);
//
// if (!ruleExecuted) {
// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
// }
// } finally {
// //释放锁
// lockTemplate.releaseLock(lockInfo);
// String imei = payloadDict.getStr("imei");
// String funcType = payloadDict.getStr("funcType");
// // 执行业务逻辑
// if(StringUtils.isNotBlank(imei)){
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
// //在线状态
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
// }
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
// if (intType > 0) {
// MqttRuleContext newCtx = new MqttRuleContext();
// String commandType = "Light_"+intType;
// newCtx.setCommandType(commandType);
// newCtx.setDeviceImei(imei);
// newCtx.setPayloadDict(payloadDict);
//
// boolean ok = ruleEngine.executeRule(newCtx);
// if (!ok) {
// log.warn("新规则引擎未命中, imei={}", imei);
// }
// String[] topicArr = receivedTopic.split("/");
//
// NewMqttRuleContext context = new NewMqttRuleContext();
// context.setCommandType(topicArr[2]+"_"+funcType);
// context.setDeviceImei(imei);
// context.setPayloadDict(payloadDict);
//
// boolean ruleExecuted = newRuleEngine.executeRule(context);
//
// if (!ruleExecuted) {
// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
// }
}
}
//// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class);
//// if (null == lockInfo) {
//// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei);
//// return;
//// }
////// 获取锁成功,处理业务
//// try {
//// if(StringUtils.isNotBlank(imei)){
//// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
//// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
//// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
//// //在线状态
//// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
//// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
//// }
////
//// String[] topicArr = receivedTopic.split("/");
////
//// NewMqttRuleContext context = new NewMqttRuleContext();
//// context.setCommandType(topicArr[2]+"_"+funcType);
//// context.setDeviceImei(imei);
//// context.setPayloadDict(payloadDict);
////
//// boolean ruleExecuted = newRuleEngine.executeRule(context);
////
//// if (!ruleExecuted) {
//// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
//// }
//// } finally {
//// //释放锁
//// lockTemplate.releaseLock(lockInfo);
//// }
//
//
// /* ===== 追加:根据报文内容识别格式并统一解析 ===== */
//// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
//// if (intType > 0) {
//// MqttRuleContext newCtx = new MqttRuleContext();
//// String commandType = "Light_"+intType;
//// newCtx.setCommandType(commandType);
//// newCtx.setDeviceImei(imei);
//// newCtx.setPayloadDict(payloadDict);
////
//// boolean ok = ruleEngine.executeRule(newCtx);
//// if (!ok) {
//// log.warn("新规则引擎未命中, imei={}", imei);
//// }
//// }
// }
//}