package com.fuyuanshen.web.config; import cn.hutool.core.lang.UUID; import com.fuyuanshen.global.mqtt.config.MqttPropertiesConfig; import com.fuyuanshen.web.handler.mqtt.DeviceReceiverMessageHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * @author: 默苍璃 * @date: 2025-08-0110:46 */ @Configuration public class CustomMqttInboundConfiguration { @Autowired private MqttPropertiesConfig mqttPropertiesConfig; @Autowired private MqttPahoClientFactory mqttPahoClientFactory; @Autowired private DeviceReceiverMessageHandler deviceReceiverMessageHandler; @Bean public MessageChannel customMqttChannel(){ return new DirectChannel(); } @Bean public MessageProducer customMessageProducer(){ String clientId = "custom_client_" + UUID.fastUUID(); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttPropertiesConfig.getUrl(), clientId, mqttPahoClientFactory, "A/#", "B/#" // 直接指定这两个主题 ); adapter.setQos(1); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(customMqttChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "customMqttChannel") public MessageHandler customMessageHandler(){ return deviceReceiverMessageHandler; } }