diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java new file mode 100644 index 00000000..dd633b3c --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttMessage.java @@ -0,0 +1,46 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +import java.util.List; + +/** + * MQTT消息基础模型 + */ +@Data +public class MqttMessage { + /** + * 请求ID,用于匹配请求和响应 + */ + private String requestId; + + /** + * 设备IMEI + */ + private String imei; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; + + /** + * 功能类型 + */ + private String funcType; + + /** + * 数据内容 + */ + private Object data; + + /** + * 状态(响应时使用) + */ + private String status; + + /** + * 批量数据(设备上报时使用) + */ + private List batch; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java new file mode 100644 index 00000000..80d1dcf4 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/MqttTopicInfo.java @@ -0,0 +1,29 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * MQTT主题信息模型 + */ +@Data +public class MqttTopicInfo { + /** + * 操作类型 (command/status/report) + */ + private String operation; + + /** + * 租户编码 + */ + private String tenantCode; + + /** + * 设备类型 + */ + private String deviceType; + + /** + * 设备IMEI + */ + private String imei; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java new file mode 100644 index 00000000..489d5f8f --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/base/SensorData.java @@ -0,0 +1,24 @@ +package com.fuyuanshen.global.mqtt.base; + +import lombok.Data; + +/** + * 传感器数据模型 + */ +@Data +public class SensorData { + /** + * 传感器名称 + */ + private String sensor; + + /** + * 传感器值 + */ + private Object value; + + /** + * 时间戳(毫秒) + */ + private Long timestamp; +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java new file mode 100644 index 00000000..b77ac210 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/IotMqttMessageHandler.java @@ -0,0 +1,136 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + + +/** + * IoT设备MQTT消息处理器 + * 用于处理设备上报的数据和响应消息 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class IotMqttMessageHandler { + + private final IotMqttService iotMqttService; + + /** + * 处理MQTT消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + JSONObject message = JSON.parseObject(payload); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + iotMqttService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + handleDeviceReport(topicInfo, message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + + } + + /** + * 解析MQTT主题 + * + * @param topic 主题字符串 + * @return 主题信息对象 + */ + MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } + + /** + * 处理设备上报数据 + * + * @param topicInfo 主题信息 + * @param message 消息内容 + */ + private void handleDeviceReport(MqttTopicInfo topicInfo, JSONObject message) { + // 获取时间戳 + Long timestamp = message.getLong("timestamp"); + + // 处理批量数据上报 + if (message.containsKey("batch")) { + JSONObject batchData = message.getJSONObject("batch"); + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + batchData, + timestamp); + } + // 处理单个数据上报 + else if (message.containsKey("sensor") && message.containsKey("value")) { + String sensor = message.getString("sensor"); + Object value = message.get("value"); + iotMqttService.handleSingleReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + sensor, + value, + timestamp); + } + // 处理其他格式的数据 + else { + // 将整个消息作为批量数据处理 + iotMqttService.handleBatchReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message, + timestamp); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java new file mode 100644 index 00000000..1d661683 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/handler/MqttMessageHandler.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.handler; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * MQTT消息处理器 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MqttMessageHandler { + + private final MqttMessageService mqttMessageService; + + + /** + * 处理MQTT消息 + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(String topic, String payload) { + try { + // 解析主题 + MqttTopicInfo topicInfo = MqttTopicUtils.parseTopic(topic); + if (topicInfo == null) { + log.warn("无法解析MQTT主题: topic={}", topic); + return; + } + + // 解析消息内容 + MqttMessage message = JSON.parseObject(payload, MqttMessage.class); + + // 根据主题类型处理消息 + switch (topicInfo.getOperation()) { + case "command": + // 处理下发指令(设备端不会主动发送command类型消息) + log.warn("收到非法的MQTT消息类型: operation={}", topicInfo.getOperation()); + break; + case "status": + // 处理设备对指令的响应 + mqttMessageService.handleCommandResponse( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + case "report": + // 处理设备主动上报的数据 + mqttMessageService.handleDeviceReport( + topicInfo.getTenantCode(), + topicInfo.getDeviceType(), + topicInfo.getImei(), + message); + break; + default: + log.warn("未知的MQTT主题操作类型: operation={}", topicInfo.getOperation()); + break; + } + } catch (Exception e) { + log.error("处理MQTT消息时发生错误: topic={}, payload={}", topic, payload, e); + } + } + +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java new file mode 100644 index 00000000..a48caab9 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/IotMqttService.java @@ -0,0 +1,96 @@ +package com.fuyuanshen.global.mqtt.service; + +import com.alibaba.fastjson2.JSONObject; + +/** + * 通用IoT设备MQTT协议服务接口 + * 遵循统一的MQTT通信协议规范 + */ +public interface IotMqttService { + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 指令主题 + */ + String buildCommandTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 响应主题 + */ + String buildStatusTopic(String tenantCode, String deviceType, String imei); + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 上报主题 + */ + String buildReportTopic(String tenantCode, String deviceType, String imei); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 (JSON格式) + */ + void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送响应消息到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 发送设备上报数据的确认消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 确认消息 (JSON格式) + */ + void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message); + + /** + * 处理设备上报的单个传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param sensor 传感器名称 + * @param value 传感器值 + * @param timestamp 时间戳 + */ + void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp); + + /** + * 处理设备上报的批量传感器数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param batchData 批量数据 + * @param timestamp 时间戳 + */ + void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp); + + /** + * 处理设备对指令的响应 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 (JSON格式) + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java new file mode 100644 index 00000000..2a587223 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/MqttMessageService.java @@ -0,0 +1,37 @@ +package com.fuyuanshen.global.mqtt.service; + + +import com.fuyuanshen.global.mqtt.base.MqttMessage; + +/** + * MQTT消息处理服务接口 + */ +public interface MqttMessageService { + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message); + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message); +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java new file mode 100644 index 00000000..afe38bd1 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/IotMqttServiceImpl.java @@ -0,0 +1,109 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.fuyuanshen.global.mqtt.service.IotMqttService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +import java.lang.reflect.Method; + +/** + * 通用IoT设备MQTT协议服务实现类 + * 遵循统一的MQTT通信协议规范 + */ +@Slf4j +@Service +public class IotMqttServiceImpl implements IotMqttService { + + @Autowired + private ApplicationContext applicationContext; + + // MQTT主题前缀 + private static final String COMMAND_PREFIX = "command"; + private static final String STATUS_PREFIX = "status"; + private static final String REPORT_PREFIX = "report"; + + @Override + public String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildCommandTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendStatus(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildStatusTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送响应消息到设备: topic={}, payload={}", topic, payload); + } + + @Override + public void sendReportAck(String tenantCode, String deviceType, String imei, JSONObject message) { + String topic = buildReportTopic(tenantCode, deviceType, imei); + String payload = message.toJSONString(); + sendMqttMessage(topic, 1, payload); + log.info("发送设备上报数据确认消息: topic={}, payload={}", topic, payload); + } + + @Override + public void handleSingleReport(String tenantCode, String deviceType, String imei, + String sensor, Object value, Long timestamp) { + log.info("处理设备上报的单个传感器数据: tenantCode={}, deviceType={}, imei={}, sensor={}, value={}, timestamp={}", + tenantCode, deviceType, imei, sensor, value, timestamp); + + // TODO: 实现具体的业务逻辑,如更新设备状态、存储传感器数据等 + } + + @Override + public void handleBatchReport(String tenantCode, String deviceType, String imei, + JSONObject batchData, Long timestamp) { + log.info("处理设备上报的批量传感器数据: tenantCode={}, deviceType={}, imei={}, batchData={}, timestamp={}", + tenantCode, deviceType, imei, JSON.toJSONString(batchData), timestamp); + + // TODO: 实现具体的业务逻辑,如批量更新设备状态、存储传感器数据等 + } + + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, JSONObject message) { + log.info("处理设备对指令的响应: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // TODO: 实现具体的业务逻辑,如更新指令执行状态等 + } + + /** + * 通过反射方式发送MQTT消息 + * @param topic 主题 + * @param qos 服务质量等级 + * @param payload 消息内容 + */ + private void sendMqttMessage(String topic, int qos, String payload) { + try { + Object mqttGateway = applicationContext.getBean("mqttGateway"); + Method sendMethod = mqttGateway.getClass().getMethod("sendMsgToMqtt", String.class, int.class, String.class); + sendMethod.invoke(mqttGateway, topic, qos, payload); + } catch (Exception e) { + log.error("发送MQTT消息失败: topic={}, payload={}", topic, payload, e); + } + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java new file mode 100644 index 00000000..1dc13cc9 --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/service/impl/MqttMessageServiceImpl.java @@ -0,0 +1,163 @@ +package com.fuyuanshen.global.mqtt.service.impl; + +import com.alibaba.fastjson2.JSON; +import com.fuyuanshen.equipment.domain.Device; +import com.fuyuanshen.equipment.mapper.DeviceMapper; +import com.fuyuanshen.global.mqtt.base.MqttMessage; +import com.fuyuanshen.global.mqtt.base.SensorData; +import com.fuyuanshen.global.mqtt.config.MqttGateway; +import com.fuyuanshen.global.mqtt.service.MqttMessageService; +import com.fuyuanshen.global.mqtt.utils.MqttTopicUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * MQTT消息处理服务实现类 + */ +@Slf4j +@Service +public class MqttMessageServiceImpl implements MqttMessageService { + + private final MqttGateway mqttGateway; + + private final DeviceMapper deviceMapper; + + public MqttMessageServiceImpl(MqttGateway mqttGateway, DeviceMapper deviceMapper) { + this.mqttGateway = mqttGateway; + this.deviceMapper = deviceMapper; + } + + /** + * 处理下发指令的响应消息 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 响应消息 + */ + @Override + public void handleCommandResponse(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备响应消息: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 根据requestId更新指令执行状态 + // TODO: 实现具体的业务逻辑,比如更新指令执行结果等 + } + + /** + * 处理设备主动上报的数据 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 上报消息 + */ + @Override + public void handleDeviceReport(String tenantCode, String deviceType, String imei, MqttMessage message) { + log.info("处理设备上报数据: tenantCode={}, deviceType={}, imei={}, message={}", + tenantCode, deviceType, imei, JSON.toJSONString(message)); + + // 查找设备 + Device device = deviceMapper.selectDeviceByImei(imei); + if (device == null) { + log.warn("未找到对应设备: imei={}", imei); + return; + } + + // 处理批量数据上报 + if (message.getBatch() != null && !message.getBatch().isEmpty()) { + for (int i = 0; i < message.getBatch().size(); i++) { + processSensorData(device, message.getBatch().get(i)); + } + } + // 处理单个数据上报 + else if (message.getData() != null) { + // 如果data是一个SensorData对象,则处理它 + // 这里可以根据实际的数据结构做相应处理 + } + } + + /** + * 发送指令到设备 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @param message 指令消息 + */ + @Override + public void sendCommand(String tenantCode, String deviceType, String imei, MqttMessage message) { + // 构建下发指令主题 + String topic = MqttTopicUtils.buildCommandTopic(tenantCode, deviceType, imei); + + // 设置时间戳 + if (message.getTimestamp() == null) { + message.setTimestamp(System.currentTimeMillis()); + } + + // 发送消息到MQTT + String payload = JSON.toJSONString(message); + mqttGateway.sendMsgToMqtt(topic, 1, payload); + + log.info("发送指令到设备: topic={}, payload={}", topic, payload); + } + + /** + * 处理传感器数据 + * @param device 设备对象 + * @param sensorData 传感器数据 + */ + private void processSensorData(Device device, SensorData sensorData) { + log.info("处理传感器数据: deviceId={}, sensor={}, value={}", + device.getId(), sensorData.getSensor(), sensorData.getValue()); + + String sensor = sensorData.getSensor(); + Object value = sensorData.getValue(); + + // 根据不同的传感器类型处理数据 + switch (sensor) { + case "mainLightMode": + // 处理主灯模式数据 + updateDeviceMainLightMode(device, value); + break; + case "mainLightBrightness": + // 处理主灯亮度数据 + updateDeviceMainLightBrightness(device, value); + break; + case "batteryPercent": + // 处理电池电量数据 + updateDeviceBatteryPercent(device, value); + break; + default: + log.warn("未知的传感器类型: sensor={}", sensor); + break; + } + } + + /** + * 更新设备主灯模式 + * @param device 设备对象 + * @param value 主灯模式值 + */ + private void updateDeviceMainLightMode(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯模式: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备主灯亮度 + * @param device 设备对象 + * @param value 主灯亮度值 + */ + private void updateDeviceMainLightBrightness(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备主灯亮度: deviceId={}, value={}", device.getId(), value); + } + + /** + * 更新设备电池电量 + * @param device 设备对象 + * @param value 电池电量值 + */ + private void updateDeviceBatteryPercent(Device device, Object value) { + // TODO: 实现具体的业务逻辑 + log.info("更新设备电池电量: deviceId={}, value={}", device.getId(), value); + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java new file mode 100644 index 00000000..774a29cb --- /dev/null +++ b/fys-admin/src/main/java/com/fuyuanshen/global/mqtt/utils/MqttTopicUtils.java @@ -0,0 +1,71 @@ +package com.fuyuanshen.global.mqtt.utils; + +import com.fuyuanshen.global.mqtt.base.MqttTopicInfo; +import lombok.experimental.UtilityClass; + +/** + * MQTT主题处理工具类 + */ +@UtilityClass +public class MqttTopicUtils { + + public static final String COMMAND_PREFIX = "command"; + public static final String STATUS_PREFIX = "status"; + public static final String REPORT_PREFIX = "report"; + + /** + * 构建下发指令主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildCommandTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", COMMAND_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建响应数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildStatusTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", STATUS_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 构建设备上报数据主题 + * @param tenantCode 租户编码 + * @param deviceType 设备类型 + * @param imei 设备IMEI + * @return 主题字符串 + */ + public static String buildReportTopic(String tenantCode, String deviceType, String imei) { + return String.format("%s/%s/%s/%s", REPORT_PREFIX, tenantCode, deviceType, imei); + } + + /** + * 解析MQTT主题 + * @param topic 主题字符串 + * @return 主题信息对象 + */ + public static MqttTopicInfo parseTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + + String[] parts = topic.split("/"); + if (parts.length != 4) { + return null; + } + + MqttTopicInfo info = new MqttTopicInfo(); + info.setOperation(parts[0]); + info.setTenantCode(parts[1]); + info.setDeviceType(parts[2]); + info.setImei(parts[3]); + return info; + } +} \ No newline at end of file diff --git a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java index 03d2c988..031618ba 100644 --- a/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java +++ b/fys-admin/src/main/java/com/fuyuanshen/web/service/device/DeviceBJQBizService.java @@ -59,6 +59,7 @@ public class DeviceBJQBizService { private final MqttGateway mqttGateway; private final DeviceLogMapper deviceLogMapper; + public int sendMessage(AppDeviceSendMsgBo bo) { List deviceIds = bo.getDeviceIds(); if (deviceIds == null || deviceIds.isEmpty()) { @@ -573,4 +574,5 @@ public class DeviceBJQBizService { uploadDeviceLogo(dto); } } + } diff --git a/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java new file mode 100644 index 00000000..81706ddb --- /dev/null +++ b/fys-modules/fys-app/src/main/java/com/fuyuanshen/app/domain/vo/Main.java @@ -0,0 +1,52 @@ +package com.fuyuanshen.app.domain.vo; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.TimeZone; + +public class Main { + public static void main(String[] args) throws IOException { + + String[] availableIDs = TimeZone.getAvailableIDs(); + for (String id : availableIDs) { + System.out.println(id); + } + + byte[] data = "hello, world!".getBytes(StandardCharsets.UTF_8); + try (CountInputStream input = new CountInputStream(new ByteArrayInputStream(data))) { + int n; + while ((n = input.read()) != -1) { + System.out.println((char)n); + } + System.out.println("Total read " + input.getBytesRead() + " bytes"); + } + } +} + +class CountInputStream extends FilterInputStream { + private int count = 0; + + CountInputStream(InputStream in) { + super(in); + } + + public int getBytesRead() { + return this.count; + } + + public int read() throws IOException { + int n = in.read(); + if (n != -1) { + this.count ++; + } + return n; + } + + public int read(byte[] b, int off, int len) throws IOException { + int n = in.read(b, off, len); + if (n != -1) { + this.count += n; + } + return n; + } +} \ No newline at end of file diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 00000000..b4f68c98 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,184 @@ +#user nobody; +worker_processes 1; + +#error_log logs/error.log; +#error_log logs/error.log notice; +#error_log logs/error.log info; + +#pid logs/nginx.pid; + +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + # '$status $body_bytes_sent "$http_referer" ' + # '"$http_user_agent" "$http_x_forwarded_for"'; + + #access_log logs/access.log main; + + sendfile on; + #tcp_nopush on; + + #keepalive_timeout 0; + keepalive_timeout 65; + + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + upstream websocket { + server 127.0.0.1:9083; + } + + #gzip on; + + server { + listen 80; + server_name cnxhyc.com; + + #charset koi8-r; + + #access_log logs/host.access.log main; + + location / { + root html; + index index.html index.htm; + try_files $uri $uri/ /index.html; + } + + #error_page 404 /404.html; + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root html; + } + + # proxy the PHP scripts to Apache listening on 127.0.0.1:80 + # + #location ~ \.php$ { + # proxy_pass http://127.0.0.1; + #} + + # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000 + # + #location ~ \.php$ { + # root html; + # fastcgi_pass 127.0.0.1:9000; + # fastcgi_index index.php; + # fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name; + # include fastcgi_params; + #} + + # deny access to .htaccess files, if Apache's document root + # concurs with nginx's one + # + #location ~ /\.ht { + # deny all; + #} + + location /fys/ { + proxy_pass http://localhost:9000/fys/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location /backend/ { + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header REMOTE-HOST $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_read_timeout 86400s; + # sse 与 websocket参数 + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_buffering off; + proxy_cache off; + proxy_pass http://localhost:8000/; + } + + # API 代理 + location /jq/ { + proxy_pass http://localhost:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + } + + # another virtual host using mix of IP-, name-, and port-based configuration + # + #server { + # listen 8000; + # listen somename:8080; + # server_name somename alias another.alias; + + # location / { + # root html; + # index index.html index.htm; + # } + #} + + # HTTPS server + server { + listen 443 ssl; + server_name cnxhyc.com www.cnxhyc.com; + + ssl_certificate /cert/cnxhyc.com.pem; + ssl_certificate_key /cert/cnxhyc.com.key; + + # 使用更现代的 SSL 配置 + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers TLS_AES_256_GCM_SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4; + ssl_prefer_server_ciphers off; + + ssl_session_cache shared:SSL:10m; + ssl_session_timeout 10m; + + location /wss { + proxy_pass http://websocket; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + } + + location / { + root html; + index index.html index.htm; + } + + # API 代理 + location /jq/ { + proxy_pass http://47.107.152.87:8000/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 60s; + proxy_read_timeout 600s; + } + + # 重定向 /xh 到 /xh/ + location = /xh { + return 301 /xh/; + } + + # 后台系统 + location /xh/ { + alias /usr/local/nginx/html/jingquan/; + try_files $uri $uri/ /jingquan/index.html; + } + } +} \ No newline at end of file