1
0

mqtt协议

This commit is contained in:
2025-07-07 09:46:39 +08:00
parent 04766447e6
commit 6b2ebf2414
11 changed files with 302 additions and 0 deletions

View File

@ -293,3 +293,12 @@ file:
pic: C:\eladmin\file\ #设备图片存储路径
#ip: http://fuyuanshen.com:81/ #服务器地址
ip: https://fuyuanshen.com/ #服务器地址
# MQTT配置
mqtt:
username: admin
password: fys123456
url: tcp://47.107.152.87:1883
subClientId: fys_subClient_01
subTopic: worker/alert/#,worker/location/#
pubTopic: worker/location
pubClientId: fys_pubClient_01

View File

@ -100,6 +100,14 @@
<artifactId>fys-common-sse</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,33 @@
package com.fuyuanshen.system.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
/**
* @Author: HarryLin
* @Date: 2025/3/20 14:40
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@Configuration
public class MqttConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
/** 创建连接工厂 **/
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true); //设置新会话
options.setUserName(mqttPropertiesConfig.getUsername());
options.setPassword(mqttPropertiesConfig.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttPropertiesConfig.getUrl()});
factory.setConnectionOptions(options);
return factory;
}
}

View File

@ -0,0 +1,17 @@
package com.fuyuanshen.system.mqtt.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* @Author: HarryLin
* @Date: 2025/3/20 17:06
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);
public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload );
}

View File

@ -0,0 +1,60 @@
package com.fuyuanshen.system.mqtt.config;
import com.fuyuanshen.system.mqtt.receiver.ReceiverMessageHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @Author: HarryLin
* @Date: 2025/3/20 14:54
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@Configuration
public class MqttInboundConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
@Autowired
private ReceiverMessageHandler receiverMessageHandler;
//消息通道
@Bean
public MessageChannel messageInboundChannel(){
return new DirectChannel();
}
/**
* 配置入站适配器
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
* */
@Bean
public MessageProducer messageProducer(){
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
mqttPropertiesConfig.getUrl(),
mqttPropertiesConfig.getSubClientId(),
mqttPahoClientFactory,
mqttPropertiesConfig.getSubTopic().split(",")
);
mqttPahoMessageDrivenChannelAdapter.setQos(1);
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());
return mqttPahoMessageDrivenChannelAdapter;
}
/** 指定处理消息来自哪个通道 */
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel")
public MessageHandler messageHandler(){
return receiverMessageHandler;
}
}

View File

@ -0,0 +1,50 @@
package com.fuyuanshen.system.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @Author: HarryLin
* @Date: 2025/3/20 15:46
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@Configuration
@Slf4j
public class MqttOutboundConfiguration {
@Autowired
private MqttPropertiesConfig mqttPropertiesConfig;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
// 消息通道
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
/** 配置出站消息处理器 */
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel") // 指定处理器针对哪个通道的消息进行处理
public MessageHandler mqttOutboundMessageHandler(){
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
mqttPropertiesConfig.getUrl(),
mqttPropertiesConfig.getPubClientId(),
mqttPahoClientFactory
);
mqttPahoMessageHandler.setDefaultQos(1);
mqttPahoMessageHandler.setDefaultTopic("worker/location");
mqttPahoMessageHandler.setAsync(true);
return mqttPahoMessageHandler;
}
}

View File

@ -0,0 +1,24 @@
package com.fuyuanshen.system.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Author: HarryLin
* @Date: 2025/3/20 14:32
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@Data
@ConfigurationProperties(prefix = "mqtt")
@Component
public class MqttPropertiesConfig {
private String username;
private String password;
private String url;
private String subClientId;
private String subTopic;
private String pubClientId;
private String pubTopic;
}

View File

@ -0,0 +1,23 @@
package com.fuyuanshen.system.mqtt.publish;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/")
@Slf4j
public class DeviceDataController {
@Autowired
private MqttClientTest mqttClientTest;
// @PostMapping("/{deviceId}/command")
public ResponseEntity<String> sendCommand() {
mqttClientTest.sendMsg();
return ResponseEntity.ok("success");
}
}

View File

@ -0,0 +1,22 @@
package com.fuyuanshen.system.mqtt.publish;
import com.fuyuanshen.system.mqtt.config.MqttGateway;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MqttClientTest {
@Autowired
private MqttGateway mqttGateway;
public void sendMsg() {
mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot");
log.info("message is send");
mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2");
log.info("message is send2");
}
}

View File

@ -0,0 +1,25 @@
package com.fuyuanshen.system.mqtt.publish;
import com.fuyuanshen.system.mqtt.config.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
/**
* @Author: HarryLin
* @Date: 2025/3/20 16:16
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@Service
public class MqttMessageSender {
@Autowired
private MqttGateway mqttGateway;
public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, String payload) {
mqttGateway.sendMsgToMqtt(topic,payload);
}
public void sendMsg(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload) {
mqttGateway.sendMsgToMqtt(topic,qos,payload);
}
}

View File

@ -0,0 +1,31 @@
package com.fuyuanshen.system.mqtt.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
* @Author: HarryLin
* @Date: 2025/3/20 15:24
* @Company: 北京红山信息科技研究院有限公司
* @Email: linyun@***.com.cn
**/
@Service
@Slf4j
public class ReceiverMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException{
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}"
,payload,receivedTopic,receivedQos,timestamp);
}
}