集成MQTT,优化代码
This commit is contained in:
@ -0,0 +1,33 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.config;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 14:40
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@Configuration
|
||||||
|
public class MqttConfiguration {
|
||||||
|
@Autowired
|
||||||
|
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||||
|
/** 创建连接工厂 **/
|
||||||
|
@Bean
|
||||||
|
public MqttPahoClientFactory mqttPahoClientFactory(){
|
||||||
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||||
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
|
options.setCleanSession(true); //设置新会话
|
||||||
|
options.setUserName(mqttPropertiesConfig.getUsername());
|
||||||
|
options.setPassword(mqttPropertiesConfig.getPassword().toCharArray());
|
||||||
|
options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()});
|
||||||
|
factory.setConnectionOptions(options);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.config;
|
||||||
|
|
||||||
|
import org.springframework.integration.annotation.MessagingGateway;
|
||||||
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
||||||
|
import org.springframework.messaging.handler.annotation.Header;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 17:06
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
|
||||||
|
public interface MqttGateway {
|
||||||
|
public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);
|
||||||
|
public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload );
|
||||||
|
}
|
@ -0,0 +1,60 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.config;
|
||||||
|
|
||||||
|
|
||||||
|
import com.fuyuanshen.modules.mqtt.receiver.ReceiverMessageHandler;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.annotation.ServiceActivator;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.core.MessageProducer;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||||
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||||
|
import org.springframework.messaging.MessageChannel;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 14:54
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@Configuration
|
||||||
|
public class MqttInboundConfiguration {
|
||||||
|
@Autowired
|
||||||
|
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||||
|
@Autowired
|
||||||
|
private MqttPahoClientFactory mqttPahoClientFactory;
|
||||||
|
@Autowired
|
||||||
|
private ReceiverMessageHandler receiverMessageHandler;
|
||||||
|
//消息通道
|
||||||
|
@Bean
|
||||||
|
public MessageChannel messageInboundChannel(){
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 配置入站适配器
|
||||||
|
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
|
||||||
|
* */
|
||||||
|
@Bean
|
||||||
|
public MessageProducer messageProducer(){
|
||||||
|
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||||
|
mqttPropertiesConfig.getUrl(),
|
||||||
|
mqttPropertiesConfig.getSubClientId(),
|
||||||
|
mqttPahoClientFactory,
|
||||||
|
mqttPropertiesConfig.getSubTopic().split(",")
|
||||||
|
);
|
||||||
|
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
||||||
|
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
||||||
|
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());
|
||||||
|
return mqttPahoMessageDrivenChannelAdapter;
|
||||||
|
}
|
||||||
|
/** 指定处理消息来自哪个通道 */
|
||||||
|
@Bean
|
||||||
|
@ServiceActivator(inputChannel = "messageInboundChannel")
|
||||||
|
public MessageHandler messageHandler(){
|
||||||
|
return receiverMessageHandler;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.config;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.annotation.ServiceActivator;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||||
|
import org.springframework.messaging.MessageChannel;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 15:46
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@Configuration
|
||||||
|
@Slf4j
|
||||||
|
public class MqttOutboundConfiguration {
|
||||||
|
@Autowired
|
||||||
|
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||||
|
@Autowired
|
||||||
|
private MqttPahoClientFactory mqttPahoClientFactory;
|
||||||
|
|
||||||
|
// 消息通道
|
||||||
|
@Bean
|
||||||
|
public MessageChannel mqttOutboundChannel(){
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** 配置出站消息处理器 */
|
||||||
|
@Bean
|
||||||
|
@ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定处理器针对哪个通道的消息进行处理
|
||||||
|
public MessageHandler mqttOutboundMessageHandler(){
|
||||||
|
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
|
||||||
|
mqttPropertiesConfig.getUrl(),
|
||||||
|
mqttPropertiesConfig.getPubClientId(),
|
||||||
|
mqttPahoClientFactory
|
||||||
|
);
|
||||||
|
mqttPahoMessageHandler.setDefaultQos(1);
|
||||||
|
mqttPahoMessageHandler.setDefaultTopic("worker/location");
|
||||||
|
mqttPahoMessageHandler.setAsync(true);
|
||||||
|
return mqttPahoMessageHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,24 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.config;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 14:32
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@Data
|
||||||
|
@ConfigurationProperties(prefix = "mqtt")
|
||||||
|
@Component
|
||||||
|
public class MqttPropertiesConfig {
|
||||||
|
private String username;
|
||||||
|
private String password;
|
||||||
|
private String url;
|
||||||
|
private String subClientId;
|
||||||
|
private String subTopic;
|
||||||
|
private String pubClientId;
|
||||||
|
private String pubTopic;
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.publish;
|
||||||
|
|
||||||
|
import com.fuyuanshen.annotation.rest.AnonymousGetMapping;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/device")
|
||||||
|
@Slf4j
|
||||||
|
public class DeviceDataController {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MqttClientTest mqttClientTest;
|
||||||
|
|
||||||
|
// @PostMapping("/{deviceId}/command")
|
||||||
|
@AnonymousGetMapping(value = "/test/command")
|
||||||
|
public ResponseEntity<String> sendCommand() {
|
||||||
|
|
||||||
|
mqttClientTest.sendMsg();
|
||||||
|
return ResponseEntity.ok("success");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.publish;
|
||||||
|
|
||||||
|
|
||||||
|
import com.fuyuanshen.modules.mqtt.config.MqttGateway;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class MqttClientTest {
|
||||||
|
@Autowired
|
||||||
|
private MqttGateway mqttGateway;
|
||||||
|
|
||||||
|
public void sendMsg() {
|
||||||
|
mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot");
|
||||||
|
log.info("message is send");
|
||||||
|
|
||||||
|
mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2");
|
||||||
|
log.info("message is send2");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.publish;
|
||||||
|
|
||||||
|
import com.fuyuanshen.modules.mqtt.config.MqttGateway;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
||||||
|
import org.springframework.messaging.handler.annotation.Header;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 16:16
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@Service
|
||||||
|
public class MqttMessageSender {
|
||||||
|
@Autowired
|
||||||
|
private MqttGateway mqttGateway;
|
||||||
|
public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) {
|
||||||
|
mqttGateway.sendMsgToMqtt(topic,payload);
|
||||||
|
}
|
||||||
|
public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) {
|
||||||
|
mqttGateway.sendMsgToMqtt(topic,qos,payload);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
package com.fuyuanshen.modules.mqtt.receiver;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
import org.springframework.messaging.MessagingException;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author: HarryLin
|
||||||
|
* @Date: 2025/3/20 15:24
|
||||||
|
* @Company: 北京红山信息科技研究院有限公司
|
||||||
|
* @Email: linyun@***.com.cn
|
||||||
|
**/
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class ReceiverMessageHandler implements MessageHandler {
|
||||||
|
@Override
|
||||||
|
public void handleMessage(Message<?> message) throws MessagingException{
|
||||||
|
Object payload = message.getPayload();
|
||||||
|
MessageHeaders headers = message.getHeaders();
|
||||||
|
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
|
||||||
|
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
|
||||||
|
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
|
||||||
|
log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}"
|
||||||
|
,payload,receivedTopic,receivedQos,timestamp);
|
||||||
|
}
|
||||||
|
}
|
@ -1,46 +0,0 @@
|
|||||||
package com.fuyuanshen.modules.security.config;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fuyuanshen.annotation.rest.AnonymousGetMapping;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.http.ResponseEntity;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
@RestController
|
|
||||||
@RequestMapping("/api/device")
|
|
||||||
@Slf4j
|
|
||||||
public class DeviceDataController {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private MqttPublisher mqttPublisher;
|
|
||||||
|
|
||||||
// @PostMapping("/{deviceId}/command")
|
|
||||||
@AnonymousGetMapping(value = "/test/command")
|
|
||||||
public ResponseEntity<String> sendCommand() {
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 将命令转换为JSON格式
|
|
||||||
// 生成模拟设备数据
|
|
||||||
Map<String, Object> payload = new HashMap<>();
|
|
||||||
payload.put("deviceId", 1);
|
|
||||||
payload.put("temperature", 20 + new Random().nextInt(10));
|
|
||||||
payload.put("humidity", 40 + new Random().nextInt(20));
|
|
||||||
payload.put("status", "online");
|
|
||||||
payload.put("timestamp", System.currentTimeMillis());
|
|
||||||
|
|
||||||
// 发布设备数据到MQTT
|
|
||||||
String jsonPayload = new ObjectMapper().writeValueAsString(payload);
|
|
||||||
mqttPublisher.publish("device/data/" + 1, jsonPayload);
|
|
||||||
return ResponseEntity.ok("命令已发送");
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("发送设备命令失败: {}", e.getMessage(), e);
|
|
||||||
return ResponseEntity.status(500).body("发送命令失败");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
package com.fuyuanshen.modules.security.config;
|
|
||||||
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class MqttConfig {
|
|
||||||
|
|
||||||
@Value("${mqtt.host}")
|
|
||||||
private String host;
|
|
||||||
|
|
||||||
@Value("${mqtt.clientId}")
|
|
||||||
private String clientId;
|
|
||||||
|
|
||||||
@Value("${mqtt.username}")
|
|
||||||
private String username;
|
|
||||||
|
|
||||||
@Value("${mqtt.password}")
|
|
||||||
private String password;
|
|
||||||
|
|
||||||
@Value("${mqtt.defaultTopic}")
|
|
||||||
private String defaultTopic;
|
|
||||||
|
|
||||||
// 配置MQTT客户端工厂
|
|
||||||
@Bean
|
|
||||||
public MqttConnectOptions getMqttConnectOptions() {
|
|
||||||
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
|
|
||||||
mqttConnectOptions.setUserName(username);
|
|
||||||
mqttConnectOptions.setPassword(password.toCharArray());
|
|
||||||
mqttConnectOptions.setServerURIs(new String[]{host});
|
|
||||||
mqttConnectOptions.setKeepAliveInterval(60);
|
|
||||||
return mqttConnectOptions;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 配置MQTT客户端
|
|
||||||
@Bean
|
|
||||||
public MqttPahoClientFactory mqttClientFactory() {
|
|
||||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
|
||||||
factory.setConnectionOptions(getMqttConnectOptions());
|
|
||||||
return factory;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 其他配置...
|
|
||||||
}
|
|
@ -1,33 +0,0 @@
|
|||||||
package com.fuyuanshen.modules.security.config;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
||||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
||||||
import org.springframework.messaging.support.GenericMessage;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class MqttPublisher {
|
|
||||||
|
|
||||||
private final MqttPahoClientFactory mqttClientFactory;
|
|
||||||
private final String clientId;
|
|
||||||
|
|
||||||
public MqttPublisher(MqttPahoClientFactory mqttClientFactory,
|
|
||||||
@Value("${mqtt.clientId}") String clientId) {
|
|
||||||
this.mqttClientFactory = mqttClientFactory;
|
|
||||||
this.clientId = clientId + "-publisher";
|
|
||||||
}
|
|
||||||
|
|
||||||
public void publish(String topic, String payload) {
|
|
||||||
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
|
|
||||||
clientId, mqttClientFactory);
|
|
||||||
messageHandler.setAsync(true);
|
|
||||||
messageHandler.setDefaultTopic(topic);
|
|
||||||
messageHandler.start();
|
|
||||||
messageHandler.setConverter(new DefaultPahoMessageConverter());
|
|
||||||
messageHandler.handleMessage(new GenericMessage<>(payload));
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,47 +0,0 @@
|
|||||||
package com.fuyuanshen.modules.security.config;
|
|
||||||
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
|
||||||
import org.springframework.integration.core.MessageProducer;
|
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
||||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
||||||
import org.springframework.messaging.MessageChannel;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class MqttSubscriberConfig {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public MessageChannel mqttInputChannel() {
|
|
||||||
return new DirectChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public MessageProducer inbound(MqttPahoClientFactory mqttClientFactory) {
|
|
||||||
|
|
||||||
MqttPahoMessageDrivenChannelAdapter adapter =
|
|
||||||
new MqttPahoMessageDrivenChannelAdapter(
|
|
||||||
"subscriberClient",
|
|
||||||
mqttClientFactory,
|
|
||||||
"test/topic/#","device/data/#"); // 订阅主题通配符
|
|
||||||
adapter.setCompletionTimeout(5000);
|
|
||||||
adapter.setConverter(new DefaultPahoMessageConverter());
|
|
||||||
adapter.setQos(1);
|
|
||||||
adapter.setOutputChannel(mqttInputChannel());
|
|
||||||
return adapter;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* @Bean
|
|
||||||
@ServiceActivator(inputChannel = "mqttInputChannel")
|
|
||||||
public MessageHandler handler() {
|
|
||||||
return message -> {
|
|
||||||
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
|
|
||||||
String payload = message.getPayload().toString();
|
|
||||||
System.out.println("收到MQTT消息 - 主题: " + topic + ", 内容: " + payload);
|
|
||||||
// 处理接收到的消息
|
|
||||||
};
|
|
||||||
}*/
|
|
||||||
}
|
|
@ -151,8 +151,10 @@ logging:
|
|||||||
com.fuyuanshen: debug
|
com.fuyuanshen: debug
|
||||||
# MQTT配置
|
# MQTT配置
|
||||||
mqtt:
|
mqtt:
|
||||||
host: tcp://47.107.152.87:1883
|
|
||||||
clientId: fsy-admin-mqtt-client
|
|
||||||
username: admin
|
username: admin
|
||||||
password: fys123456
|
password: fys123456
|
||||||
defaultTopic: test/topic
|
url: tcp://127.0.0.1:1883
|
||||||
|
subClientId: wuLang_subClient_01
|
||||||
|
subTopic: worker/alert/#,worker/location/#
|
||||||
|
pubTopic: worker/location
|
||||||
|
pubClientId: wuLang_pubClient_01
|
Reference in New Issue
Block a user