From 33d61081726ac77dc792ec23c5118057b0ce1deb Mon Sep 17 00:00:00 2001 From: daiyongfei <974332738@qq.com> Date: Sat, 8 Nov 2025 09:38:19 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A4=84=E7=90=86MQTT=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/mqtt/base/MqttMessage.java | 46 +++++ .../global/mqtt/base/MqttTopicInfo.java | 29 +++ .../global/mqtt/base/SensorData.java | 24 +++ .../mqtt/handler/IotMqttMessageHandler.java | 136 +++++++++++++ .../mqtt/handler/MqttMessageHandler.java | 71 +++++++ .../global/mqtt/service/IotMqttService.java | 96 +++++++++ .../mqtt/service/MqttMessageService.java | 37 ++++ .../mqtt/service/impl/IotMqttServiceImpl.java | 109 +++++++++++ .../service/impl/MqttMessageServiceImpl.java | 163 ++++++++++++++++ .../global/mqtt/utils/MqttTopicUtils.java | 71 +++++++ .../service/device/DeviceBJQBizService.java | 2 + .../com/fuyuanshen/app/domain/vo/Main.java | 52 +++++ nginx.conf | 184 ++++++++++++++++++ 13 files changed, 1020 insertions(+) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java create mode 100644 fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java create mode 100644 nginx.conf diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java new file mode 100644 index 0000000..dd633b3 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +import java.util.List; + +/** + * MQTT消息基础模型 + */ +@Data +public class MqttMessage { + /** + * 请求ID,用于匹配请求和响应 + */ + private String requestId; + + /** + * 设备IMEI + */ + private String imei; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; + + /** + * 功能类型 + */ + private String funcType; + + /** + * 数据内容 + */ + private Object data; + + /** + * 状态(响应时使用) + */ + private String status; + + /** + * 批量数据(设备上报时使用) + */ + private List batch; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java new file mode 100644 index 0000000..80d1dcf --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java @@ -0,0 +1,29 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * MQTT主题信息模型 + */ +@Data +public class MqttTopicInfo { + /** + * 操作类型 (command/status/report) + */ + private String operation; + + /** + * 租户编码 + */ + private String tenantCode; + + /** + * 设备类型 + */ + private String deviceType; + + /** + * 设备IMEI + */ + private String imei; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java new file mode 100644 index 0000000..489d5f8 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * 传感器数据模型 + */ +@Data +public class SensorData { + /** + * 传感器名称 + */ + private String sensor; + + /** + * 传感器值 + */ + private Object value; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java new file mode 100644 index 0000000..b77ac21 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java @@ -0,0 +1,136 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +/** + * IoT设备MQTT消息处理器 + * 用于处理设备上报的数据和响应消息 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class IotMqttMessageHandler { + + private final IotMqttService iotMqttService; + + /** + * 处理MQTT消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + JSONObject message = JSON.parseObject(payload); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + iotMqttService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + handleDeviceReport(topicInfo, message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + + } + + /** + * 解析MQTT主题 + * + * @param topic 主题字符串 + * @return 主题信息对象 + */ + MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } + + /** + * 处理设备上报数据 + * + * @param topicInfo 主题信息 + * @param message 消息内容 + */ + private void handleDeviceReport(MqttTopicInfo topicInfo, JSONObject message) { + // 获取时间戳 + Long timestamp = message.getLong("timestamp"); + + // 处理批量数据上报 + if (message.containsKey("batch")) { + JSONObject batchData = message.getJSONObject("batch"); + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + batchData, + timestamp); + } + // 处理单个数据上报 + else if (message.containsKey("sensor") && message.containsKey("value")) { + String sensor = message.getString("sensor"); + Object value = message.get("value"); + iotMqttService.handleSingleReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + sensor, + value, + timestamp); + } + // 处理其他格式的数据 + else { + // 将整个消息作为批量数据处理 + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message, + timestamp); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java new file mode 100644 index 0000000..1d66168 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * MQTT消息处理器 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MqttMessageHandler { + + private final MqttMessageService mqttMessageService; + + + /** + * 处理MQTT消息 + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = MqttTopicUtils.parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + MqttMessage message = JSON.parseObject(payload, MqttMessage.class); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + mqttMessageService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + mqttMessageService.handleDeviceReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + } + +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java new file mode 100644 index 0000000..a48caab --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java @@ -0,0 +1,96 @@ +package com.fuyuanshen.global.mqtt.service; + +import com.alibaba.fastjson2.JSONObject; + +/** + * 通用IoT设备MQTT协议服务接口 + * 遵循统一的MQTT通信协议规范 + */ +public interface IotMqttService { + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 指令主题 + */ + String buildCommandTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 响应主题 + */ + String buildStatusTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 上报主题 + */ + String buildReportTopic(String tenantCode, String deviceType, String imei); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 (JSON格式) + */ + void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送响应消息到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送设备上报数据的确认消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 确认消息 (JSON格式) + */ + void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 处理设备上报的单个传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param sensor 传感器名称 + * @param value 传感器值 + * @param timestamp 时间戳 + */ + void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp); + + /** + * 处理设备上报的批量传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param batchData 批量数据 + * @param timestamp 时间戳 + */ + void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp); + + /** + * 处理设备对指令的响应 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java new file mode 100644 index 0000000..2a58722 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java @@ -0,0 +1,37 @@ +package com.fuyuanshen.global.mqtt.service; + + +import com.fuyuanshen.global.mqtt.base.MqttMessage; + +/** + * MQTT消息处理服务接口 + */ +public interface MqttMessageService { + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java new file mode 100644 index 0000000..afe38bd --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java @@ -0,0 +1,109 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +import java.lang.reflect.Method; + +/** + * 通用IoT设备MQTT协议服务实现类 + * 遵循统一的MQTT通信协议规范 + */ +@Slf4j +@Service +public class IotMqttServiceImpl implements IotMqttService { + + @Autowired + private ApplicationContext applicationContext; + + // MQTT主题前缀 + private static final String COMMAND_PREFIX = "command"; + private static final String STATUS_PREFIX = "status"; + private static final String REPORT_PREFIX = "report"; + + @Override + public String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildCommandTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildStatusTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送响应消息到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildReportTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送设备上报数据确认消息: topic={}, payload={}", topic, payload); + } + + @Override + public void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp) { + log.info("处理设备上报的单个传感器数据: tenantCode={}, deviceType={}, imei={}, sensor={}, value={}, timestamp={}", + tenantCode, deviceType, imei, sensor, value, timestamp); + + // TODO: 实现具体的业务逻辑,如更新设备状态、存储传感器数据等 + } + + @Override + public void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp) { + log.info("处理设备上报的批量传感器数据: tenantCode={}, deviceType={}, imei={}, batchData={}, timestamp={}", + tenantCode, deviceType, imei, JSON.toJSONString(batchData), timestamp); + + // TODO: 实现具体的业务逻辑,如批量更新设备状态、存储传感器数据等 + } + + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message) { + log.info("处理设备对指令的响应: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // TODO: 实现具体的业务逻辑,如更新指令执行状态等 + } + + /** + * 通过反射方式发送MQTT消息 + * @param topic 主题 + * @param qos 服务质量等级 + * @param payload 消息内容 + */ + private void sendMqttMessage(String topic, int qos, String payload) { + try { + Object mqttGateway = applicationContext.getBean("mqttGateway"); + Method sendMethod = mqttGateway.getClass().getMethod("sendMsgToMqtt", String.class, int.class, String.class); + sendMethod.invoke(mqttGateway, topic, qos, payload); + } catch (Exception e) { + log.error("发送MQTT消息失败: topic={}, payload={}", topic, payload, e); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java new file mode 100644 index 0000000..1dc13cc --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java @@ -0,0 +1,163 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.equipment.domain.Device; +import com.fuyuanshen.equipment.mapper.DeviceMapper; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.base.SensorData; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * MQTT消息处理服务实现类 + */ +@Slf4j +@Service +public class MqttMessageServiceImpl implements MqttMessageService { + + private final MqttGateway mqttGateway; + + private final DeviceMapper deviceMapper; + + public MqttMessageServiceImpl(MqttGateway mqttGateway, DeviceMapper deviceMapper) { + this.mqttGateway = mqttGateway; + this.deviceMapper = deviceMapper; + } + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备响应消息: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 根据requestId更新指令执行状态 + // TODO: 实现具体的业务逻辑,比如更新指令执行结果等 + } + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + @Override + public void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备上报数据: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 查找设备 + Device device = deviceMapper.selectDeviceByImei(imei); + if (device == null) { + log.warn("未找到对应设备: imei={}", imei); + return; + } + + // 处理批量数据上报 + if (message.getBatch() != null && !message.getBatch().isEmpty()) { + for (int i = 0; i < message.getBatch().size(); i++) { + processSensorData(device, message.getBatch().get(i)); + } + } + // 处理单个数据上报 + else if (message.getData() != null) { + // 如果data是一个SensorData对象,则处理它 + // 这里可以根据实际的数据结构做相应处理 + } + } + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message) { + // 构建下发指令主题 + String topic = MqttTopicUtils.buildCommandTopic(tenantCode, deviceType, imei); + + // 设置时间戳 + if (message.getTimestamp() == null) { + message.setTimestamp(System.currentTimeMillis()); + } + + // 发送消息到MQTT + String payload = JSON.toJSONString(message); + mqttGateway.sendMsgToMqtt(topic, 1, payload); + + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + /** + * 处理传感器数据 + * @param device 设备对象 + * @param sensorData 传感器数据 + */ + private void processSensorData(Device device, SensorData sensorData) { + log.info("处理传感器数据: deviceId={}, sensor={}, value={}", + device.getId(), sensorData.getSensor(), sensorData.getValue()); + + String sensor = sensorData.getSensor(); + Object value = sensorData.getValue(); + + // 根据不同的传感器类型处理数据 + switch (sensor) { + case "mainLightMode": + // 处理主灯模式数据 + updateDeviceMainLightMode(device, value); + break; + case "mainLightBrightness": + // 处理主灯亮度数据 + updateDeviceMainLightBrightness(device, value); + break; + case "batteryPercent": + // 处理电池电量数据 + updateDeviceBatteryPercent(device, value); + break; + default: + log.warn("未知的传感器类型: sensor={}", sensor); + break; + } + } + + /** + * 更新设备主灯模式 + * @param device 设备对象 + * @param value 主灯模式值 + */ + private void updateDeviceMainLightMode(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯模式: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备主灯亮度 + * @param device 设备对象 + * @param value 主灯亮度值 + */ + private void updateDeviceMainLightBrightness(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯亮度: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备电池电量 + * @param device 设备对象 + * @param value 电池电量值 + */ + private void updateDeviceBatteryPercent(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备电池电量: deviceId={}, value={}", device.getId(), value); + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java new file mode 100644 index 0000000..774a29c --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.utils; + +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.experimental.UtilityClass; + +/** + * MQTT主题处理工具类 + */ +@UtilityClass +public class MqttTopicUtils { + + public static final String COMMAND_PREFIX = "command"; + public static final String STATUS_PREFIX = "status"; + public static final String REPORT_PREFIX = "report"; + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 解析MQTT主题 + * @param topic 主题字符串 + * @return 主题信息对象 + */ + public static MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java index 03d2c98..031618b 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java @@ -59,6 +59,7 @@ public class DeviceBJQBizService { private final MqttGateway mqttGateway; private final DeviceLogMapper deviceLogMapper; + public int sendMessage(AppDeviceSendMsgBo bo) { List deviceIds = bo.getDeviceIds(); if (deviceIds == null || deviceIds.isEmpty()) { @@ -573,4 +574,5 @@ public class DeviceBJQBizService { uploadDeviceLogo(dto); } } + } diff --git a/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java new file mode 100644 index 0000000..81706dd --- /dev/null +++ b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java @@ -0,0 +1,52 @@ +package com.fuyuanshen.app.domain.vo; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.TimeZone; + +public class Main { + public static void main(String[] args) throws IOException { + + String[] availableIDs = TimeZone.getAvailableIDs(); + for (String id : availableIDs) { + System.out.println(id); + } + + byte[] data = "hello, world!".getBytes(StandardCharsets.UTF_8); + try (CountInputStream input = new CountInputStream(new ByteArrayInputStream(data))) { + int n; + while ((n = input.read()) != -1) { + System.out.println((char)n); + } + System.out.println("Total read " + input.getBytesRead() + " bytes"); + } + } +} + +class CountInputStream extends FilterInputStream { + private int count = 0; + + CountInputStream(InputStream in) { + super(in); + } + + public int getBytesRead() { + return this.count; + } + + public int read() throws IOException { + int n = in.read(); + if (n != -1) { + this.count ++; + } + return n; + } + + public int read(byte[] b, int off, int len) throws IOException { + int n = in.read(b, off, len); + if (n != -1) { + this.count += n; + } + return n; + } +} \ No newline at end of file diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..b4f68c9 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,184 @@ +#user nobody; +worker_processes 1; + +#error_log logs/error.log; +#error_log logs/error.log notice; +#error_log logs/error.log info; + +#pid logs/nginx.pid; + +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + # '$status $body_bytes_sent "$http_referer" ' + # '"$http_user_agent" "$http_x_forwarded_for"'; + + #access_log logs/access.log main; + + sendfile on; + #tcp_nopush on; + + #keepalive_timeout 0; + keepalive_timeout 65; + + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + upstream websocket { + server 127.0.0.1:9083; + } + + #gzip on; + + server { + listen 80; + server_name cnxhyc.com; + + #charset koi8-r; + + #access_log logs/host.access.log main; + + location / { + root html; + index index.html index.htm; + try_files $uri $uri/ /index.html; + } + + #error_page 404 /404.html; + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root html; + } + + # proxy the PHP scripts to Apache listening on 127.0.0.1:80 + # + #location ~ \.php$ { + # proxy_pass http://127.0.0.1; + #} + + # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000 + # + #location ~ \.php$ { + # root html; + # fastcgi_pass 127.0.0.1:9000; + # fastcgi_index index.php; + # fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name; + # include fastcgi_params; + #} + + # deny access to .htaccess files, if Apache's document root + # concurs with nginx's one + # + #location ~ /\.ht { + # deny all; + #} + + location /fys/ { + proxy_pass http://localhost:9000/fys/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location /backend/ { + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header REMOTE-HOST $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_read_timeout 86400s; + # sse 与 websocket参数 + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_buffering off; + proxy_cache off; + proxy_pass http://localhost:8000/; + } + + # API 代理 + location /jq/ { + proxy_pass http://localhost:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + } + + # another virtual host using mix of IP-, name-, and port-based configuration + # + #server { + # listen 8000; + # listen somename:8080; + # server_name somename alias another.alias; + + # location / { + # root html; + # index index.html index.htm; + # } + #} + + # HTTPS server + server { + listen 443 ssl; + server_name cnxhyc.com www.cnxhyc.com; + + ssl_certificate /cert/cnxhyc.com.pem; + ssl_certificate_key /cert/cnxhyc.com.key; + + # 使用更现代的 SSL 配置 + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers TLS_AES_256_GCM_SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; + ssl_prefer_server_ciphers off; + + ssl_session_cache shared:SSL:10m; + ssl_session_timeout 10m; + + location /wss { + proxy_pass http://websocket; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + } + + location / { + root html; + index index.html index.htm; + } + + # API 代理 + location /jq/ { + proxy_pass http://47.107.152.87:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + + # 重定向 /xh 到 /xh/ + location = /xh { + return 301 /xh/; + } + + # 后台系统 + location /xh/ { + alias /usr/local/nginx/html/jingquan/; + try_files $uri $uri/ /jingquan/index.html; + } + } +} \ No newline at end of file From cc2b7664a8f7e20a66e6dd5a7500c99cfead74a7 Mon Sep 17 00:00:00 2001 From: daiyongfei <974332738@qq.com> Date: Sat, 8 Nov 2025 09:48:12 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A4=84=E7=90=86MQTT=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../global/mqtt/base/MqttMessage.java | 46 +++++ .../global/mqtt/base/MqttTopicInfo.java | 29 +++ .../global/mqtt/base/SensorData.java | 24 +++ .../mqtt/handler/IotMqttMessageHandler.java | 136 +++++++++++++ .../mqtt/handler/MqttMessageHandler.java | 71 +++++++ .../global/mqtt/service/IotMqttService.java | 96 +++++++++ .../mqtt/service/MqttMessageService.java | 37 ++++ .../mqtt/service/impl/IotMqttServiceImpl.java | 109 +++++++++++ .../service/impl/MqttMessageServiceImpl.java | 163 ++++++++++++++++ .../global/mqtt/utils/MqttTopicUtils.java | 71 +++++++ .../com/fuyuanshen/app/domain/vo/Main.java | 52 +++++ nginx.conf | 184 ++++++++++++++++++ 12 files changed, 1018 insertions(+) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java create mode 100644 fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java create mode 100644 nginx.conf diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java new file mode 100644 index 0000000..dd633b3 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +import java.util.List; + +/** + * MQTT消息基础模型 + */ +@Data +public class MqttMessage { + /** + * 请求ID,用于匹配请求和响应 + */ + private String requestId; + + /** + * 设备IMEI + */ + private String imei; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; + + /** + * 功能类型 + */ + private String funcType; + + /** + * 数据内容 + */ + private Object data; + + /** + * 状态(响应时使用) + */ + private String status; + + /** + * 批量数据(设备上报时使用) + */ + private List batch; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java new file mode 100644 index 0000000..80d1dcf --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java @@ -0,0 +1,29 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * MQTT主题信息模型 + */ +@Data +public class MqttTopicInfo { + /** + * 操作类型 (command/status/report) + */ + private String operation; + + /** + * 租户编码 + */ + private String tenantCode; + + /** + * 设备类型 + */ + private String deviceType; + + /** + * 设备IMEI + */ + private String imei; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java new file mode 100644 index 0000000..489d5f8 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * 传感器数据模型 + */ +@Data +public class SensorData { + /** + * 传感器名称 + */ + private String sensor; + + /** + * 传感器值 + */ + private Object value; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java new file mode 100644 index 0000000..b77ac21 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java @@ -0,0 +1,136 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +/** + * IoT设备MQTT消息处理器 + * 用于处理设备上报的数据和响应消息 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class IotMqttMessageHandler { + + private final IotMqttService iotMqttService; + + /** + * 处理MQTT消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + JSONObject message = JSON.parseObject(payload); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + iotMqttService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + handleDeviceReport(topicInfo, message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + + } + + /** + * 解析MQTT主题 + * + * @param topic 主题字符串 + * @return 主题信息对象 + */ + MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } + + /** + * 处理设备上报数据 + * + * @param topicInfo 主题信息 + * @param message 消息内容 + */ + private void handleDeviceReport(MqttTopicInfo topicInfo, JSONObject message) { + // 获取时间戳 + Long timestamp = message.getLong("timestamp"); + + // 处理批量数据上报 + if (message.containsKey("batch")) { + JSONObject batchData = message.getJSONObject("batch"); + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + batchData, + timestamp); + } + // 处理单个数据上报 + else if (message.containsKey("sensor") && message.containsKey("value")) { + String sensor = message.getString("sensor"); + Object value = message.get("value"); + iotMqttService.handleSingleReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + sensor, + value, + timestamp); + } + // 处理其他格式的数据 + else { + // 将整个消息作为批量数据处理 + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message, + timestamp); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java new file mode 100644 index 0000000..1d66168 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * MQTT消息处理器 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MqttMessageHandler { + + private final MqttMessageService mqttMessageService; + + + /** + * 处理MQTT消息 + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = MqttTopicUtils.parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + MqttMessage message = JSON.parseObject(payload, MqttMessage.class); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + mqttMessageService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + mqttMessageService.handleDeviceReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + } + +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java new file mode 100644 index 0000000..a48caab --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java @@ -0,0 +1,96 @@ +package com.fuyuanshen.global.mqtt.service; + +import com.alibaba.fastjson2.JSONObject; + +/** + * 通用IoT设备MQTT协议服务接口 + * 遵循统一的MQTT通信协议规范 + */ +public interface IotMqttService { + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 指令主题 + */ + String buildCommandTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 响应主题 + */ + String buildStatusTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 上报主题 + */ + String buildReportTopic(String tenantCode, String deviceType, String imei); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 (JSON格式) + */ + void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送响应消息到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送设备上报数据的确认消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 确认消息 (JSON格式) + */ + void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 处理设备上报的单个传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param sensor 传感器名称 + * @param value 传感器值 + * @param timestamp 时间戳 + */ + void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp); + + /** + * 处理设备上报的批量传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param batchData 批量数据 + * @param timestamp 时间戳 + */ + void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp); + + /** + * 处理设备对指令的响应 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java new file mode 100644 index 0000000..2a58722 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java @@ -0,0 +1,37 @@ +package com.fuyuanshen.global.mqtt.service; + + +import com.fuyuanshen.global.mqtt.base.MqttMessage; + +/** + * MQTT消息处理服务接口 + */ +public interface MqttMessageService { + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java new file mode 100644 index 0000000..afe38bd --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java @@ -0,0 +1,109 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +import java.lang.reflect.Method; + +/** + * 通用IoT设备MQTT协议服务实现类 + * 遵循统一的MQTT通信协议规范 + */ +@Slf4j +@Service +public class IotMqttServiceImpl implements IotMqttService { + + @Autowired + private ApplicationContext applicationContext; + + // MQTT主题前缀 + private static final String COMMAND_PREFIX = "command"; + private static final String STATUS_PREFIX = "status"; + private static final String REPORT_PREFIX = "report"; + + @Override + public String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildCommandTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildStatusTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送响应消息到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildReportTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送设备上报数据确认消息: topic={}, payload={}", topic, payload); + } + + @Override + public void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp) { + log.info("处理设备上报的单个传感器数据: tenantCode={}, deviceType={}, imei={}, sensor={}, value={}, timestamp={}", + tenantCode, deviceType, imei, sensor, value, timestamp); + + // TODO: 实现具体的业务逻辑,如更新设备状态、存储传感器数据等 + } + + @Override + public void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp) { + log.info("处理设备上报的批量传感器数据: tenantCode={}, deviceType={}, imei={}, batchData={}, timestamp={}", + tenantCode, deviceType, imei, JSON.toJSONString(batchData), timestamp); + + // TODO: 实现具体的业务逻辑,如批量更新设备状态、存储传感器数据等 + } + + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message) { + log.info("处理设备对指令的响应: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // TODO: 实现具体的业务逻辑,如更新指令执行状态等 + } + + /** + * 通过反射方式发送MQTT消息 + * @param topic 主题 + * @param qos 服务质量等级 + * @param payload 消息内容 + */ + private void sendMqttMessage(String topic, int qos, String payload) { + try { + Object mqttGateway = applicationContext.getBean("mqttGateway"); + Method sendMethod = mqttGateway.getClass().getMethod("sendMsgToMqtt", String.class, int.class, String.class); + sendMethod.invoke(mqttGateway, topic, qos, payload); + } catch (Exception e) { + log.error("发送MQTT消息失败: topic={}, payload={}", topic, payload, e); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java new file mode 100644 index 0000000..1dc13cc --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java @@ -0,0 +1,163 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.equipment.domain.Device; +import com.fuyuanshen.equipment.mapper.DeviceMapper; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.base.SensorData; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * MQTT消息处理服务实现类 + */ +@Slf4j +@Service +public class MqttMessageServiceImpl implements MqttMessageService { + + private final MqttGateway mqttGateway; + + private final DeviceMapper deviceMapper; + + public MqttMessageServiceImpl(MqttGateway mqttGateway, DeviceMapper deviceMapper) { + this.mqttGateway = mqttGateway; + this.deviceMapper = deviceMapper; + } + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备响应消息: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 根据requestId更新指令执行状态 + // TODO: 实现具体的业务逻辑,比如更新指令执行结果等 + } + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + @Override + public void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备上报数据: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 查找设备 + Device device = deviceMapper.selectDeviceByImei(imei); + if (device == null) { + log.warn("未找到对应设备: imei={}", imei); + return; + } + + // 处理批量数据上报 + if (message.getBatch() != null && !message.getBatch().isEmpty()) { + for (int i = 0; i < message.getBatch().size(); i++) { + processSensorData(device, message.getBatch().get(i)); + } + } + // 处理单个数据上报 + else if (message.getData() != null) { + // 如果data是一个SensorData对象,则处理它 + // 这里可以根据实际的数据结构做相应处理 + } + } + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message) { + // 构建下发指令主题 + String topic = MqttTopicUtils.buildCommandTopic(tenantCode, deviceType, imei); + + // 设置时间戳 + if (message.getTimestamp() == null) { + message.setTimestamp(System.currentTimeMillis()); + } + + // 发送消息到MQTT + String payload = JSON.toJSONString(message); + mqttGateway.sendMsgToMqtt(topic, 1, payload); + + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + /** + * 处理传感器数据 + * @param device 设备对象 + * @param sensorData 传感器数据 + */ + private void processSensorData(Device device, SensorData sensorData) { + log.info("处理传感器数据: deviceId={}, sensor={}, value={}", + device.getId(), sensorData.getSensor(), sensorData.getValue()); + + String sensor = sensorData.getSensor(); + Object value = sensorData.getValue(); + + // 根据不同的传感器类型处理数据 + switch (sensor) { + case "mainLightMode": + // 处理主灯模式数据 + updateDeviceMainLightMode(device, value); + break; + case "mainLightBrightness": + // 处理主灯亮度数据 + updateDeviceMainLightBrightness(device, value); + break; + case "batteryPercent": + // 处理电池电量数据 + updateDeviceBatteryPercent(device, value); + break; + default: + log.warn("未知的传感器类型: sensor={}", sensor); + break; + } + } + + /** + * 更新设备主灯模式 + * @param device 设备对象 + * @param value 主灯模式值 + */ + private void updateDeviceMainLightMode(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯模式: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备主灯亮度 + * @param device 设备对象 + * @param value 主灯亮度值 + */ + private void updateDeviceMainLightBrightness(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯亮度: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备电池电量 + * @param device 设备对象 + * @param value 电池电量值 + */ + private void updateDeviceBatteryPercent(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备电池电量: deviceId={}, value={}", device.getId(), value); + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java new file mode 100644 index 0000000..774a29c --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.utils; + +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.experimental.UtilityClass; + +/** + * MQTT主题处理工具类 + */ +@UtilityClass +public class MqttTopicUtils { + + public static final String COMMAND_PREFIX = "command"; + public static final String STATUS_PREFIX = "status"; + public static final String REPORT_PREFIX = "report"; + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 解析MQTT主题 + * @param topic 主题字符串 + * @return 主题信息对象 + */ + public static MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } +} \ No newline at end of file diff --git a/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java new file mode 100644 index 0000000..81706dd --- /dev/null +++ b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java @@ -0,0 +1,52 @@ +package com.fuyuanshen.app.domain.vo; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.TimeZone; + +public class Main { + public static void main(String[] args) throws IOException { + + String[] availableIDs = TimeZone.getAvailableIDs(); + for (String id : availableIDs) { + System.out.println(id); + } + + byte[] data = "hello, world!".getBytes(StandardCharsets.UTF_8); + try (CountInputStream input = new CountInputStream(new ByteArrayInputStream(data))) { + int n; + while ((n = input.read()) != -1) { + System.out.println((char)n); + } + System.out.println("Total read " + input.getBytesRead() + " bytes"); + } + } +} + +class CountInputStream extends FilterInputStream { + private int count = 0; + + CountInputStream(InputStream in) { + super(in); + } + + public int getBytesRead() { + return this.count; + } + + public int read() throws IOException { + int n = in.read(); + if (n != -1) { + this.count ++; + } + return n; + } + + public int read(byte[] b, int off, int len) throws IOException { + int n = in.read(b, off, len); + if (n != -1) { + this.count += n; + } + return n; + } +} \ No newline at end of file diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..b4f68c9 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,184 @@ +#user nobody; +worker_processes 1; + +#error_log logs/error.log; +#error_log logs/error.log notice; +#error_log logs/error.log info; + +#pid logs/nginx.pid; + +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + # '$status $body_bytes_sent "$http_referer" ' + # '"$http_user_agent" "$http_x_forwarded_for"'; + + #access_log logs/access.log main; + + sendfile on; + #tcp_nopush on; + + #keepalive_timeout 0; + keepalive_timeout 65; + + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + upstream websocket { + server 127.0.0.1:9083; + } + + #gzip on; + + server { + listen 80; + server_name cnxhyc.com; + + #charset koi8-r; + + #access_log logs/host.access.log main; + + location / { + root html; + index index.html index.htm; + try_files $uri $uri/ /index.html; + } + + #error_page 404 /404.html; + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root html; + } + + # proxy the PHP scripts to Apache listening on 127.0.0.1:80 + # + #location ~ \.php$ { + # proxy_pass http://127.0.0.1; + #} + + # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000 + # + #location ~ \.php$ { + # root html; + # fastcgi_pass 127.0.0.1:9000; + # fastcgi_index index.php; + # fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name; + # include fastcgi_params; + #} + + # deny access to .htaccess files, if Apache's document root + # concurs with nginx's one + # + #location ~ /\.ht { + # deny all; + #} + + location /fys/ { + proxy_pass http://localhost:9000/fys/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location /backend/ { + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header REMOTE-HOST $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_read_timeout 86400s; + # sse 与 websocket参数 + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_buffering off; + proxy_cache off; + proxy_pass http://localhost:8000/; + } + + # API 代理 + location /jq/ { + proxy_pass http://localhost:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + } + + # another virtual host using mix of IP-, name-, and port-based configuration + # + #server { + # listen 8000; + # listen somename:8080; + # server_name somename alias another.alias; + + # location / { + # root html; + # index index.html index.htm; + # } + #} + + # HTTPS server + server { + listen 443 ssl; + server_name cnxhyc.com www.cnxhyc.com; + + ssl_certificate /cert/cnxhyc.com.pem; + ssl_certificate_key /cert/cnxhyc.com.key; + + # 使用更现代的 SSL 配置 + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers TLS_AES_256_GCM_SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; + ssl_prefer_server_ciphers off; + + ssl_session_cache shared:SSL:10m; + ssl_session_timeout 10m; + + location /wss { + proxy_pass http://websocket; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + } + + location / { + root html; + index index.html index.htm; + } + + # API 代理 + location /jq/ { + proxy_pass http://47.107.152.87:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + + # 重定向 /xh 到 /xh/ + location = /xh { + return 301 /xh/; + } + + # 后台系统 + location /xh/ { + alias /usr/local/nginx/html/jingquan/; + try_files $uri $uri/ /jingquan/index.html; + } + } +} \ No newline at end of file From 1be80be3098bd8da0dbbe9eb09bd8fd17f4a2664 Mon Sep 17 00:00:00 2001 From: daiyongfei <974332738@qq.com> Date: Mon, 10 Nov 2025 10:37:03 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E7=81=AF=E5=85=89=E6=A8=A1=E5=BC=8F=205?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/AppDeviceBJQController.java | 2 +- .../bjq/AppDeviceBJQ6075Controller.java | 42 +++++----- .../global/mqtt/base/MqttMessage.java | 4 +- .../mqtt/enums/DeviceFunctionType6075.java | 82 +++++++++++++++++++ .../global/mqtt/service/IotMqttService.java | 58 +++++++------ .../mqtt/service/impl/IotMqttServiceImpl.java | 54 ++++++------ .../device/DeviceBJQ6075BizService.java | 4 +- .../impl/DeviceBJQ6075BizServiceImpl.java | 63 +++++++++++++- 8 files changed, 234 insertions(+), 75 deletions(-) create mode 100644 fys-admin/src/main/java/com/fuyuanshen/global/mqtt/enums/DeviceFunctionType6075.java diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/AppDeviceBJQController.java b/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/AppDeviceBJQController.java index ce85548..a02733b 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/AppDeviceBJQController.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/AppDeviceBJQController.java @@ -31,6 +31,7 @@ public class AppDeviceBJQController extends BaseController { private final DeviceBJQBizService appDeviceService; + /** * 获取设备详细信息 * @@ -86,7 +87,6 @@ public class AppDeviceBJQController extends BaseController { } - /** * 灯光模式 * 0(关灯),1(强光模式),2(弱光模式), 3(爆闪模式), 4(泛光模式) diff --git a/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/bjq/AppDeviceBJQ6075Controller.java b/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/bjq/AppDeviceBJQ6075Controller.java index c3e9df3..4f8839d 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/bjq/AppDeviceBJQ6075Controller.java +++ b/fys-admin/src/main/java/com/fuyuanshen/app/controller/device/bjq/AppDeviceBJQ6075Controller.java @@ -11,7 +11,6 @@ import com.fuyuanshen.common.ratelimiter.annotation.FunctionAccessBatcAnnotation import com.fuyuanshen.common.web.core.BaseController; import com.fuyuanshen.equipment.domain.dto.AppDeviceSendMsgBo; import com.fuyuanshen.web.service.device.DeviceBJQ6075BizService; -import com.fuyuanshen.web.service.device.DeviceBJQBizService; import jakarta.validation.constraints.NotNull; import lombok.RequiredArgsConstructor; import org.springframework.validation.annotation.Validated; @@ -27,7 +26,6 @@ import org.springframework.web.multipart.MultipartFile; @RequestMapping("/app/bjq6075/device") public class AppDeviceBJQ6075Controller extends BaseController { - private final DeviceBJQBizService appDeviceService; private final DeviceBJQ6075BizService appDeviceService6075; @@ -44,33 +42,36 @@ public class AppDeviceBJQ6075Controller extends BaseController { /** - * 人员信息登记 + * 人员信息登记 1 */ @PostMapping(value = "/registerPersonInfo") public R registerPersonInfo(@Validated(AddGroup.class) @RequestBody AppPersonnelInfoBo bo) { - return toAjax(appDeviceService.registerPersonInfo(bo)); + return toAjax(appDeviceService6075.registerPersonInfo(bo)); } + /** - * 发送信息 + * 发送信息 2 */ @PostMapping(value = "/sendMessage") @FunctionAccessBatcAnnotation(value = "sendMessage", timeOut = 30, batchMaxTimeOut = 40) public R sendMessage(@RequestBody AppDeviceSendMsgBo bo) { - return toAjax(appDeviceService.sendMessage(bo)); + return toAjax(appDeviceService6075.sendMessage(bo)); } + /** - * 发送报警信息 + * 发送报警信息 3 */ @PostMapping(value = "/sendAlarmMessage") @FunctionAccessBatcAnnotation(value = "sendAlarmMessage", timeOut = 5, batchMaxTimeOut = 10) public R sendAlarmMessage(@RequestBody AppDeviceSendMsgBo bo) { - return toAjax(appDeviceService.sendAlarmMessage(bo)); + return toAjax(appDeviceService6075.sendAlarmMessage(bo)); } + /** - * 上传设备logo图片 + * 上传设备logo图片 4 */ @PostMapping("/uploadLogo") @FunctionAccessAnnotation("uploadLogo") @@ -80,61 +81,62 @@ public class AppDeviceBJQ6075Controller extends BaseController { if (file.getSize() > 1024 * 1024 * 2) { return R.warn("图片不能大于2M"); } - appDeviceService.uploadDeviceLogo(bo); + appDeviceService6075.uploadDeviceLogo(bo); return R.ok(); } + /** - * 灯光模式 + * 灯光模式 5 * (主光模式) * 0(关闭灯光),1(强光),2(超强光), 3(工作光), 4(节能光),5(爆闪),6(SOS) */ @PostMapping("/lightModeSettings") public R lightModeSettings(@RequestBody DeviceInstructDto params) { - appDeviceService.lightModeSettings(params); + appDeviceService6075.lightModeSettings(params); return R.ok(); } /** - * 灯光模式 + * 灯光模式 6 * (辅光模式) * 0(关闭灯光),1(泛光),2(泛光爆闪), 3(警示灯), 4(警示灯/泛光) */ @PostMapping("/auxiliaryLightModeSettings") public R auxiliaryLightModeSettings(@RequestBody DeviceInstructDto params) { - appDeviceService.lightModeSettings(params); + appDeviceService6075.lightModeSettings(params); return R.ok(); } /** - * 灯光亮度设置 + * 灯光亮度设置 7 */ @PostMapping("/lightBrightnessSettings") public R lightBrightnessSettings(@RequestBody DeviceInstructDto params) { - appDeviceService.lightBrightnessSettings(params); + appDeviceService6075.lightBrightnessSettings(params); return R.ok(); } /** - * 激光模式设置 + * 激光模式设置 8 */ @PostMapping("/laserModeSettings") public R laserModeSettings(@RequestBody DeviceInstructDto params) { - appDeviceService.laserModeSettings(params); + appDeviceService6075.laserModeSettings(params); return R.ok(); } /** - * 声光报警模式设置 + * 声光报警模式设置 9 * Sound and light alarm */ @PostMapping("/salaModeSettings") public R salaModeSettings(@RequestBody DeviceInstructDto params) { - appDeviceService.laserModeSettings(params); + appDeviceService6075.laserModeSettings(params); return R.ok(); } diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java index dd633b3..970dd16 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java @@ -9,6 +9,7 @@ import java.util.List; */ @Data public class MqttMessage { + /** * 请求ID,用于匹配请求和响应 */ @@ -27,7 +28,7 @@ public class MqttMessage { /** * 功能类型 */ - private String funcType; + private Integer funcType; /** * 数据内容 @@ -43,4 +44,5 @@ public class MqttMessage { * 批量数据(设备上报时使用) */ private List batch; + } \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/enums/DeviceFunctionType6075.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/enums/DeviceFunctionType6075.java new file mode 100644 index 0000000..1b9758f --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/enums/DeviceFunctionType6075.java @@ -0,0 +1,82 @@ +package com.fuyuanshen.global.mqtt.enums; + +/** + * 设备功能类型枚举 + * 基于AppDeviceBJQ6075Controller中的功能注释1-9设计 + */ +public enum DeviceFunctionType6075 { + + /** + * 人员信息登记 + */ + REGISTER_PERSON_INFO(1, "REGISTER_PERSON_INFO", "人员信息登记"), + + /** + * 发送信息 + */ + SEND_MESSAGE(2, "SEND_MESSAGE", "发送信息"), + + /** + * 发送报警信息 + */ + SEND_ALARM_MESSAGE(3, "SEND_ALARM_MESSAGE", "发送报警信息"), + + /** + * 上传设备logo图片 + */ + UPLOAD_LOGO(4, "UPLOAD_LOGO", "上传设备logo图片"), + + /** + * 灯光模式(主光模式) + * 0(关闭灯光),1(强光),2(超强光), 3(工作光), 4(节能光),5(爆闪),6(SOS) + */ + LIGHT_MODE(5, "LIGHT_MODE", "灯光模式"), + + /** + * 灯光模式(辅光模式) + * 0(关闭灯光),1(泛光),2(泛光爆闪), 3(警示灯), 4(警示灯/泛光) + */ + AUXILIARY_LIGHT_MODE(6, "AUXILIARY_LIGHT_MODE", "辅光模式"), + + /** + * 灯光亮度设置 + */ + LIGHT_BRIGHTNESS(7, "LIGHT_BRIGHTNESS", "灯光亮度设置"), + + /** + * 激光模式设置 + */ + LASER_MODE(8, "LASER_MODE", "激光模式设置"), + + /** + * 声光报警模式设置 + */ + SOUND_AND_LIGHT_ALARM(9, "SOUND_AND_LIGHT_ALARM", "声光报警模式设置"); + + private final int number; + private final String code; + private final String description; + + DeviceFunctionType6075(int number, String code, String description) { + this.number = number; + this.code = code; + this.description = description; + } + + public int getNumber() { + return number; + } + + public String getCode() { + return code; + } + + public String getDescription() { + return description; + } + + @Override + public String toString() { + return code; + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java index a48caab..d0e2871 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java @@ -1,6 +1,7 @@ package com.fuyuanshen.global.mqtt.service; import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.base.MqttMessage; /** * 通用IoT设备MQTT协议服务接口 @@ -10,87 +11,96 @@ public interface IotMqttService { /** * 构建下发指令主题 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI + * @param imei 设备IMEI * @return 指令主题 */ - String buildCommandTopic(String tenantCode, String deviceType, String imei); + String buildCommandTopic(String tenantCode, Long deviceType, String imei); /** * 构建响应数据主题 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI + * @param imei 设备IMEI * @return 响应主题 */ String buildStatusTopic(String tenantCode, String deviceType, String imei); /** * 构建设备上报数据主题 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI + * @param imei 设备IMEI * @return 上报主题 */ String buildReportTopic(String tenantCode, String deviceType, String imei); /** * 发送指令到设备 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI - * @param message 指令消息 (JSON格式) + * @param imei 设备IMEI + * @param message 指令消息 (JSON格式) */ - void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message); + void sendCommand(String tenantCode, Long deviceType, String imei, MqttMessage message); /** * 发送响应消息到设备 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI - * @param message 响应消息 (JSON格式) + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) */ void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message); /** * 发送设备上报数据的确认消息 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI - * @param message 确认消息 (JSON格式) + * @param imei 设备IMEI + * @param message 确认消息 (JSON格式) */ void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message); /** * 处理设备上报的单个传感器数据 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI - * @param sensor 传感器名称 - * @param value 传感器值 - * @param timestamp 时间戳 + * @param imei 设备IMEI + * @param sensor 传感器名称 + * @param value 传感器值 + * @param timestamp 时间戳 */ - void handleSingleReport(String tenantCode, String deviceType, String imei, - String sensor, Object value, Long timestamp); + void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp); /** * 处理设备上报的批量传感器数据 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI - * @param batchData 批量数据 - * @param timestamp 时间戳 + * @param imei 设备IMEI + * @param batchData 批量数据 + * @param timestamp 时间戳 */ - void handleBatchReport(String tenantCode, String deviceType, String imei, - JSONObject batchData, Long timestamp); + void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp); /** * 处理设备对指令的响应 + * * @param tenantCode 租户编码 * @param deviceType 设备类型 - * @param imei 设备IMEI - * @param message 响应消息 (JSON格式) + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) */ void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message); } \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java index afe38bd..91f176c 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java @@ -2,6 +2,7 @@ package com.fuyuanshen.global.mqtt.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.base.MqttMessage; import com.fuyuanshen.global.mqtt.service.IotMqttService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -17,38 +18,38 @@ import java.lang.reflect.Method; @Slf4j @Service public class IotMqttServiceImpl implements IotMqttService { - + @Autowired private ApplicationContext applicationContext; - + // MQTT主题前缀 private static final String COMMAND_PREFIX = "command"; private static final String STATUS_PREFIX = "status"; private static final String REPORT_PREFIX = "report"; - + @Override - public String buildCommandTopic(String tenantCode, String deviceType, String imei) { + public String buildCommandTopic(String tenantCode, Long deviceType, String imei) { return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); } - + @Override public String buildStatusTopic(String tenantCode, String deviceType, String imei) { return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); } - + @Override public String buildReportTopic(String tenantCode, String deviceType, String imei) { return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); } - + @Override - public void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message) { + public void sendCommand(String tenantCode, Long deviceType, String imei, MqttMessage message) { String topic = buildCommandTopic(tenantCode, deviceType, imei); - String payload = message.toJSONString(); + String payload = JSON.toJSONString(message); sendMqttMessage(topic, 1, payload); log.info("发送指令到设备: topic={}, payload={}", topic, payload); } - + @Override public void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message) { String topic = buildStatusTopic(tenantCode, deviceType, imei); @@ -56,7 +57,7 @@ public class IotMqttServiceImpl implements IotMqttService { sendMqttMessage(topic, 1, payload); log.info("发送响应消息到设备: topic={}, payload={}", topic, payload); } - + @Override public void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message) { String topic = buildReportTopic(tenantCode, deviceType, imei); @@ -64,37 +65,38 @@ public class IotMqttServiceImpl implements IotMqttService { sendMqttMessage(topic, 1, payload); log.info("发送设备上报数据确认消息: topic={}, payload={}", topic, payload); } - + @Override - public void handleSingleReport(String tenantCode, String deviceType, String imei, - String sensor, Object value, Long timestamp) { - log.info("处理设备上报的单个传感器数据: tenantCode={}, deviceType={}, imei={}, sensor={}, value={}, timestamp={}", + public void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp) { + log.info("处理设备上报的单个传感器数据: tenantCode={}, deviceType={}, imei={}, sensor={}, value={}, timestamp={}", tenantCode, deviceType, imei, sensor, value, timestamp); - + // TODO: 实现具体的业务逻辑,如更新设备状态、存储传感器数据等 } - + @Override - public void handleBatchReport(String tenantCode, String deviceType, String imei, - JSONObject batchData, Long timestamp) { - log.info("处理设备上报的批量传感器数据: tenantCode={}, deviceType={}, imei={}, batchData={}, timestamp={}", + public void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp) { + log.info("处理设备上报的批量传感器数据: tenantCode={}, deviceType={}, imei={}, batchData={}, timestamp={}", tenantCode, deviceType, imei, JSON.toJSONString(batchData), timestamp); - + // TODO: 实现具体的业务逻辑,如批量更新设备状态、存储传感器数据等 } - + @Override public void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message) { - log.info("处理设备对指令的响应: tenantCode={}, deviceType={}, imei={}, message={}", + log.info("处理设备对指令的响应: tenantCode={}, deviceType={}, imei={}, message={}", tenantCode, deviceType, imei, JSON.toJSONString(message)); - + // TODO: 实现具体的业务逻辑,如更新指令执行状态等 } /** * 通过反射方式发送MQTT消息 - * @param topic 主题 - * @param qos 服务质量等级 + * + * @param topic 主题 + * @param qos 服务质量等级 * @param payload 消息内容 */ private void sendMqttMessage(String topic, int qos, String payload) { diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQ6075BizService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQ6075BizService.java index 2e1de5e..378f6c5 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQ6075BizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQ6075BizService.java @@ -78,9 +78,9 @@ public interface DeviceBJQ6075BizService { /** * 灯光模式 - * 0(关灯),1(强光模式),2(弱光模式), 3(爆闪模式), 4(泛光模式) + * (主光模式) + * 0(关闭灯光),1(强光),2(超强光), 3(工作光), 4(节能光),5(爆闪),6(SOS) */ - public void lightModeSettings(DeviceInstructDto params); // 灯光亮度设置 diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/DeviceBJQ6075BizServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/DeviceBJQ6075BizServiceImpl.java index b88197c..74541f7 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/DeviceBJQ6075BizServiceImpl.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/impl/DeviceBJQ6075BizServiceImpl.java @@ -27,9 +27,12 @@ import com.fuyuanshen.equipment.enums.LightModeEnum; import com.fuyuanshen.equipment.mapper.DeviceLogMapper; import com.fuyuanshen.equipment.mapper.DeviceMapper; import com.fuyuanshen.equipment.mapper.DeviceTypeMapper; +import com.fuyuanshen.global.mqtt.base.MqttMessage; import com.fuyuanshen.global.mqtt.config.MqttGateway; import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants; import com.fuyuanshen.global.mqtt.constants.MqttConstants; +import com.fuyuanshen.global.mqtt.enums.DeviceFunctionType6075; +import com.fuyuanshen.global.mqtt.service.IotMqttService; import com.fuyuanshen.web.service.device.DeviceBJQ6075BizService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -60,6 +63,8 @@ public class DeviceBJQ6075BizServiceImpl implements DeviceBJQ6075BizService { private final MqttGateway mqttGateway; private final DeviceLogMapper deviceLogMapper; + private final IotMqttService iotMqttService; + /** * 获取设备详情 @@ -396,12 +401,67 @@ public class DeviceBJQ6075BizServiceImpl implements DeviceBJQ6075BizService { } } + /** * 灯光模式 - * 0(关灯),1(强光模式),2(弱光模式), 3(爆闪模式), 4(泛光模式) + * (主光模式) + * 0(关闭灯光),1(强光),2(超强光), 3(工作光), 4(节能光),5(爆闪),6(SOS) */ @Override public void lightModeSettings(DeviceInstructDto params) { + try { + Long deviceId = params.getDeviceId(); + Device device = deviceMapper.selectById(deviceId); + if (device == null) { + throw new ServiceException("设备不存在"); + } + if (getDeviceStatus(device.getDeviceImei())) { + throw new ServiceException(device.getDeviceName() + ",设备已断开连接"); + } + + String deviceImei = device.getDeviceImei(); + Long deviceType = device.getDeviceType(); + String tenantCode = device.getTenantId(); + + // 构建发送强光模式的MqttMessage对象 + MqttMessage message = new MqttMessage(); + message.setRequestId(UUID.randomUUID().toString()); // 生成唯一的请求ID + message.setImei(device.getDeviceImei()); // 设备IMEI + message.setTimestamp(System.currentTimeMillis()); // 当前时间戳 + message.setFuncType(DeviceFunctionType6075.LIGHT_MODE.getNumber()); // 功能类型,这里假设为灯光模式 + + // 构建数据内容 - 强光模式参数 + Map lightData = new HashMap<>(); + lightData.put("mode", 1); // 1表示强光模式 + lightData.put("type", "mainLight"); // 主灯类型 + // 可以根据需要添加更多参数 + lightData.put("brightness", 100); // 亮度设置为100% + + message.setData(lightData); + + // 调用sendCommand方法发送指令 + iotMqttService.sendCommand(tenantCode, deviceType, deviceImei, message); + + Integer instructValue = Integer.parseInt(params.getInstructValue()); + ArrayList intData = new ArrayList<>(); + intData.add(1); + intData.add(instructValue); + intData.add(0); + intData.add(0); + intData.add(0); + Map map = new HashMap<>(); + map.put("instruct", intData); + mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + device.getDeviceImei(), 1, JSON.toJSONString(map)); + log.info("发送点阵数据到设备消息=>topic:{},payload:{}", MqttConstants.GLOBAL_PUB_KEY + device.getDeviceImei(), JSON.toJSONString(map)); + LightModeEnum modeEnum = LightModeEnum.getByCode(instructValue); + recordDeviceLog(device.getId(), device.getDeviceName(), "灯光模式", modeEnum != null ? modeEnum.getName() : null, AppLoginHelper.getUserId()); + } catch (Exception e) { + e.printStackTrace(); + throw new ServiceException("发送指令失败"); + } + } + + public void lightModeSettings1(DeviceInstructDto params) { try { Long deviceId = params.getDeviceId(); Device device = deviceMapper.selectById(deviceId); @@ -430,6 +490,7 @@ public class DeviceBJQ6075BizServiceImpl implements DeviceBJQ6075BizService { } } + // 灯光亮度设置 @Override public void lightBrightnessSettings(DeviceInstructDto params) {