web端控制中心4
This commit is contained in:
@ -10,6 +10,7 @@ import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
@ -51,6 +52,9 @@ public class ReceiverMessageHandler implements MessageHandler {
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(62));
|
||||
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
// RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofHours(24));
|
||||
}
|
||||
|
||||
String state = payloadDict.getStr("state");
|
||||
|
@ -0,0 +1,110 @@
|
||||
package com.fuyuanshen.global.queue;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.equipment.domain.Device;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class MqttMessageConsumer {
|
||||
|
||||
@Autowired
|
||||
private DeviceMapper deviceMapper;
|
||||
|
||||
// 创建两个线程池:一个用于消息获取,一个用于业务处理
|
||||
private ExecutorService messageConsumerPool = Executors.newFixedThreadPool(3);
|
||||
private ExecutorService messageProcessorPool = Executors.newFixedThreadPool(10);
|
||||
|
||||
// 初始化方法,启动消息监听
|
||||
// @PostConstruct
|
||||
public void start() {
|
||||
log.info("启动MQTT消息消费者...");
|
||||
// 启动消息获取线程
|
||||
for (int i = 0; i < 3; i++) {
|
||||
messageConsumerPool.submit(this::consumeMessages);
|
||||
}
|
||||
}
|
||||
|
||||
// 销毁方法,关闭线程池
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
log.info("关闭MQTT消息消费者...");
|
||||
shutdownExecutorService(messageConsumerPool);
|
||||
shutdownExecutorService(messageProcessorPool);
|
||||
}
|
||||
|
||||
private void shutdownExecutorService(ExecutorService executorService) {
|
||||
if (executorService != null && !executorService.isShutdown()) {
|
||||
executorService.shutdown();
|
||||
try {
|
||||
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
executorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 消费者方法 - 专门负责从队列获取消息
|
||||
public void consumeMessages() {
|
||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
String threadName = Thread.currentThread().getName();
|
||||
log.info("消息消费者线程 {} 开始监听队列: {}", threadName, queueKey);
|
||||
|
||||
try {
|
||||
while (!Thread.currentThread().isInterrupted() && !messageConsumerPool.isShutdown()) {
|
||||
// 阻塞式获取队列中的消息
|
||||
String message = RedisUtils.pollDeduplicated(
|
||||
queueKey,
|
||||
MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY,
|
||||
1,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
if (message != null) {
|
||||
log.info("线程 {} 从队列中获取到消息,提交到处理线程池: {}", threadName, message);
|
||||
// 将消息处理任务提交到处理线程池
|
||||
messageProcessorPool.submit(() -> processMessage(message));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("线程 {} 消费消息时发生错误", threadName, e);
|
||||
}
|
||||
|
||||
log.info("消息消费者线程 {} 停止监听队列", threadName);
|
||||
}
|
||||
|
||||
// 处理具体业务逻辑的方法
|
||||
private void processMessage(String message) {
|
||||
String threadName = Thread.currentThread().getName();
|
||||
try {
|
||||
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
||||
|
||||
// 实现具体的业务逻辑
|
||||
// 例如更新数据库、发送通知等
|
||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("device_imei", message)
|
||||
.set("online_status", 1);
|
||||
deviceMapper.update(updateWrapper);
|
||||
// 模拟业务处理耗时
|
||||
Thread.sleep(200);
|
||||
|
||||
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
||||
} catch (Exception e) {
|
||||
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package com.fuyuanshen.global.queue;
|
||||
|
||||
public class MqttMessageQueueConstants {
|
||||
public static final String MQTT_MESSAGE_QUEUE_KEY = "mqtt:message:queue";
|
||||
public static final String MQTT_MESSAGE_DEDUP_KEY = "mqtt:message:dedup";
|
||||
}
|
@ -1,19 +1,20 @@
|
||||
package com.fuyuanshen.web.controller.device;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.fuyuanshen.app.domain.dto.APPReNameDTO;
|
||||
import com.fuyuanshen.app.domain.dto.AppRealTimeStatusDto;
|
||||
import com.fuyuanshen.app.domain.vo.APPDeviceTypeVo;
|
||||
import com.fuyuanshen.common.core.domain.R;
|
||||
import com.fuyuanshen.common.excel.utils.ExcelUtil;
|
||||
import com.fuyuanshen.common.mybatis.core.page.PageQuery;
|
||||
import com.fuyuanshen.common.mybatis.core.page.TableDataInfo;
|
||||
import com.fuyuanshen.common.web.core.BaseController;
|
||||
import com.fuyuanshen.equipment.domain.dto.AppDeviceBo;
|
||||
import com.fuyuanshen.equipment.domain.dto.InstructionRecordDto;
|
||||
import com.fuyuanshen.equipment.domain.query.DeviceQueryCriteria;
|
||||
import com.fuyuanshen.equipment.domain.vo.AppDeviceVo;
|
||||
import com.fuyuanshen.equipment.domain.vo.InstructionRecordVo;
|
||||
import com.fuyuanshen.equipment.domain.vo.WebDeviceVo;
|
||||
import com.fuyuanshen.equipment.domain.vo.*;
|
||||
import com.fuyuanshen.web.service.device.DeviceBizService;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
@ -101,4 +102,48 @@ public class DeviceControlCenterController extends BaseController {
|
||||
return appDeviceService.getInstructionRecord(dto,pageQuery);
|
||||
}
|
||||
|
||||
/**
|
||||
* 导出
|
||||
*/
|
||||
@GetMapping("/export")
|
||||
public void export(InstructionRecordDto dto, PageQuery pageQuery, HttpServletResponse response) {
|
||||
pageQuery.setPageNum(1);
|
||||
pageQuery.setPageSize(2000);
|
||||
TableDataInfo<InstructionRecordVo> instructionRecord = appDeviceService.getInstructionRecord(dto, pageQuery);
|
||||
if(instructionRecord.getRows() == null){
|
||||
return;
|
||||
}
|
||||
ExcelUtil.exportExcel(instructionRecord.getRows(), "设备操作日志", InstructionRecordVo.class, response);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 历史轨迹查询
|
||||
*/
|
||||
@GetMapping("/locationHistory")
|
||||
public TableDataInfo<LocationHistoryVo> getLocationHistory(InstructionRecordDto dto, PageQuery pageQuery) {
|
||||
return appDeviceService.getLocationHistory(dto,pageQuery);
|
||||
}
|
||||
|
||||
/**
|
||||
* 历史轨迹导出
|
||||
*/
|
||||
@GetMapping("/locationHistoryExport")
|
||||
public void locationHistoryExport(InstructionRecordDto dto, PageQuery pageQuery, HttpServletResponse response) {
|
||||
pageQuery.setPageNum(1);
|
||||
pageQuery.setPageSize(2000);
|
||||
TableDataInfo<LocationHistoryVo> result = appDeviceService.getLocationHistory(dto, pageQuery);
|
||||
if(result.getRows() == null){
|
||||
return;
|
||||
}
|
||||
ExcelUtil.exportExcel(result.getRows(), "历史轨迹记录", LocationHistoryVo.class, response);
|
||||
}
|
||||
|
||||
/**
|
||||
* 历史轨迹导出
|
||||
*/
|
||||
@GetMapping("/getLocationHistoryDetail")
|
||||
public R<List<LocationHistoryDetailVo>> getLocationHistoryDetail(Long id) {
|
||||
return R.ok(appDeviceService.getLocationHistoryDetail(id));
|
||||
}
|
||||
}
|
||||
|
@ -27,9 +27,7 @@ import com.fuyuanshen.equipment.domain.Device;
|
||||
import com.fuyuanshen.equipment.domain.dto.AppDeviceBo;
|
||||
import com.fuyuanshen.equipment.domain.dto.InstructionRecordDto;
|
||||
import com.fuyuanshen.equipment.domain.query.DeviceQueryCriteria;
|
||||
import com.fuyuanshen.equipment.domain.vo.AppDeviceVo;
|
||||
import com.fuyuanshen.equipment.domain.vo.InstructionRecordVo;
|
||||
import com.fuyuanshen.equipment.domain.vo.WebDeviceVo;
|
||||
import com.fuyuanshen.equipment.domain.vo.*;
|
||||
import com.fuyuanshen.equipment.enums.BindingStatusEnum;
|
||||
import com.fuyuanshen.equipment.enums.CommunicationModeEnum;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceLogMapper;
|
||||
@ -41,9 +39,8 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.time.*;
|
||||
import java.util.*;
|
||||
|
||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.GLOBAL_REDIS_KEY;
|
||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.*;
|
||||
@ -336,4 +333,60 @@ public class DeviceBizService {
|
||||
Page<InstructionRecordVo> result = deviceLogMapper.getInstructionRecord(pageQuery.build(), bo);
|
||||
return TableDataInfo.build(result);
|
||||
}
|
||||
|
||||
public TableDataInfo<LocationHistoryVo> getLocationHistory(InstructionRecordDto bo, PageQuery pageQuery) {
|
||||
Page<LocationHistoryVo> result = deviceMapper.getLocationHistory(pageQuery.build(), bo);
|
||||
return TableDataInfo.build(result);
|
||||
}
|
||||
|
||||
public List<LocationHistoryDetailVo> getLocationHistoryDetail(Long id) {
|
||||
Device device = deviceMapper.selectById(id);
|
||||
if (device == null) {
|
||||
throw new ServiceException("设备不存在");
|
||||
}
|
||||
|
||||
// 计算七天前的凌晨时间戳
|
||||
LocalDateTime sevenDaysAgo = LocalDateTime.of(LocalDate.now().minusDays(7), LocalTime.MIDNIGHT);
|
||||
long startTime = sevenDaysAgo.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||
|
||||
// 计算今天的凌晨时间戳
|
||||
LocalDateTime today = LocalDateTime.of(LocalDate.now(), LocalTime.MAX);
|
||||
long endTime = today.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||
|
||||
String deviceImei = device.getDeviceImei();
|
||||
String a = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DEVICE_LOCATION_KEY_PREFIX + ":history";
|
||||
Collection<String> list = RedisUtils.zRangeByScore(a, startTime, endTime);
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
Map<String, List<JSONObject>> map = new LinkedHashMap<>();
|
||||
for (String obj : list){
|
||||
JSONObject jsonObject = JSONObject.parseObject(obj);
|
||||
Long timestamp = jsonObject.getLong("timestamp");
|
||||
LocalDate date = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).toLocalDate();
|
||||
if (map.containsKey(date.toString())) {
|
||||
map.get(date.toString()).add(jsonObject);
|
||||
} else {
|
||||
ArrayList<JSONObject> jsonList = new ArrayList<>();
|
||||
jsonList.add(jsonObject);
|
||||
map.put(date.toString(), jsonList);
|
||||
}
|
||||
}
|
||||
|
||||
List<LocationHistoryDetailVo> result = new ArrayList<>();
|
||||
for (Map.Entry<String, List<JSONObject>> entry : map.entrySet()) {
|
||||
LocationHistoryDetailVo detailVo = new LocationHistoryDetailVo();
|
||||
detailVo.setDate(entry.getKey());
|
||||
detailVo.setDeviceName(device.getDeviceName());
|
||||
detailVo.setStartLocation(entry.getValue().get(0).getString("address"));
|
||||
detailVo.setEndLocation(entry.getValue().get(entry.getValue().size()-1).getString("address"));
|
||||
detailVo.setDetailList(entry.getValue());
|
||||
result.add(detailVo);
|
||||
}
|
||||
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user