提交代码2

This commit is contained in:
2026-03-02 11:17:46 +08:00
parent 20ac6b0baa
commit 34841c8704
5 changed files with 170 additions and 5 deletions

View File

@ -0,0 +1,68 @@
package com.fuyuanshen.global.mqtt.config;
import cn.hutool.core.lang.UUID;
import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@Slf4j
public class RegisEquipMqttInboundConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
@Autowired
private NewReceiverMessageHandler receiverMessageHandler3;
//消息通道
@Bean
public MessageChannel messageInboundChannel3(){
return new DirectChannel();
}
/**
* 配置入站适配器
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
* */
@Bean
public MessageProducer messageProducer3(){
// 生成一个不重复的随机数
String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID();
String subTopic = mqttPropertiesConfig.getSubTopic2();
log.info("订阅主题:{}", subTopic);
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
mqttPropertiesConfig.getUrl(),
clientId,
mqttPahoClientFactory,
subTopic.split(",")
);
mqttPahoMessageDrivenChannelAdapter.setQos(1);
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel3());
return mqttPahoMessageDrivenChannelAdapter;
}
/** 指定处理消息来自哪个通道 */
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel3")
public MessageHandler messageHandler3(){
return receiverMessageHandler3;
}
// @Bean
// @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确
// public MessageHandler deviceAlarmMessageHandler() {
// return new DeviceAlrmMessageHandler();
// }
}

View File

@ -0,0 +1,46 @@
package com.fuyuanshen.global.mqtt.config;
import cn.hutool.core.lang.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@Slf4j
public class RegisEquipMqttOutboundConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
// 消息通道
@Bean
public MessageChannel mqttOutboundChannel3(){
return new DirectChannel();
}
/** 配置出站消息处理器 */
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel3") // 指定处理器针对哪个通道的消息进行处理
public MessageHandler mqttOutboundMessageHandler3(){
String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID();
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
mqttPropertiesConfig.getUrl(),
clientId,
mqttPahoClientFactory
);
mqttPahoMessageHandler.setDefaultQos(1);
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2());
mqttPahoMessageHandler.setAsync(true);
return mqttPahoMessageHandler;
}
}

View File

@ -0,0 +1,51 @@
package com.fuyuanshen.global.mqtt.receiver;
import cn.hutool.core.lang.Dict;
import com.fuyuanshen.common.core.constant.GlobalConstants;
import com.fuyuanshen.common.core.utils.StringUtils;
import com.fuyuanshen.common.json.utils.JsonUtils;
import com.fuyuanshen.common.redis.utils.RedisUtils;
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine;
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Objects;
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
@Service
@Slf4j
public class RegisEquipReceiverMessageHandler implements MessageHandler {
@Autowired
private NewMqttRuleEngine newRuleEngine;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
log.info("MQTT3 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
payload, receivedTopic, receivedQos, timestamp);
Dict payloadDict = JsonUtils.parseMap(payload.toString());
if (receivedTopic == null || payloadDict == null) {
return;
}
}
}

View File

@ -86,7 +86,7 @@ public class DeviceBizService {
List<AppDeviceVo> records = result.getRecords();
if (records != null && !records.isEmpty()) {
records.forEach(item -> {
if (item.getCommunicationMode() != null && item.getCommunicationMode() == 0) {
if (item.getCommunicationMode() != null && (item.getCommunicationMode() == 0 || item.getCommunicationMode() == 2)) {
// 设备在线状态
String onlineStatus = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + item.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX);

View File

@ -283,10 +283,10 @@ mqtt:
password: #YtvpSfCNG
url: tcp://47.120.79.150:3883
subClientId: fys_subClient
subTopic: A/#
pubTopic: B/#
subTopic2: status/894078/#
pubTopic2: command/894078/#
subTopic: A/#,status/#
pubTopic: B/#,command/#
subTopic2: regis/equip/#
pubTopic2: regis/#
pubClientId: fys_pubClient
# TTS语音交互配置