Compare commits
11 Commits
prod
...
e80830e76c
| Author | SHA1 | Date | |
|---|---|---|---|
| e80830e76c | |||
| 346792d5c1 | |||
| cb87871982 | |||
| 437ab110b7 | |||
| b280038502 | |||
| 3e119b1dd8 | |||
| cab0884d7f | |||
| 88650c3d9f | |||
| ca11ef0e35 | |||
| ef2dc6a6f6 | |||
| ec28ab1092 |
@ -26,6 +26,7 @@ import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* HBY670设备控制类
|
||||
@ -133,7 +134,7 @@ public class AppDeviceXinghanController extends BaseController {
|
||||
}
|
||||
|
||||
// @Log("新增设备")
|
||||
@Operation(summary = "新增设备")
|
||||
@Log(title = "新增设备")
|
||||
@PostMapping(value = "/add")
|
||||
public R<Void> addDevice(@RequestBody DeviceForm deviceForm) {
|
||||
try {
|
||||
@ -144,4 +145,26 @@ public class AppDeviceXinghanController extends BaseController {
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
@PostMapping(value = "/GetDeviceByName")
|
||||
@Operation(summary = "通过蓝牙名/设备名称查询设备")
|
||||
public R<Object> GetDeviceByName(@RequestBody DeviceForm deviceForm) {
|
||||
Object device = appDeviceService.GetDeviceByName(deviceForm);
|
||||
return R.ok(device);
|
||||
}
|
||||
|
||||
|
||||
@PostMapping(value = "/getEquipCountByType")
|
||||
@Operation(summary = "查询某个类型下的设备总数量")
|
||||
public R<Object> getEquipCountByType(@RequestBody DeviceForm deviceForm) {
|
||||
Object device = appDeviceService.getEquipCountByType(deviceForm);
|
||||
return R.ok(device);
|
||||
}
|
||||
|
||||
@PostMapping(value = "/getEquipAllByType")
|
||||
@Operation(summary = "查询某个类型下的设备")
|
||||
public R<List<Map<String,Object>>> getEquipAllByType(@RequestBody DeviceForm deviceForm){
|
||||
List<Map<String,Object>> list=appDeviceService.getEquipAllByType(deviceForm);
|
||||
return R.ok(list);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,9 +1,5 @@
|
||||
package com.fuyuanshen.global.mqtt.base;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Comparator;
|
||||
@ -16,9 +12,9 @@ import java.util.List;
|
||||
@Component
|
||||
public class MqttRuleEngine {
|
||||
|
||||
@Autowired
|
||||
@Qualifier("threadPoolTaskExecutor")
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
// @Autowired
|
||||
// @Qualifier("threadPoolTaskExecutor")
|
||||
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
|
||||
private final LinkedHashMap<String, MqttMessageRule> rulesMap = new LinkedHashMap<>();
|
||||
|
||||
@ -41,7 +37,8 @@ public class MqttRuleEngine {
|
||||
int commandType = context.getCommandType();
|
||||
MqttMessageRule mqttMessageRule = rulesMap.get("Light_" + commandType);
|
||||
if (mqttMessageRule != null) {
|
||||
threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
||||
// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
||||
mqttMessageRule.execute(context);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
package com.fuyuanshen.global.mqtt.base;
|
||||
|
||||
/**
|
||||
* MQTT消息处理接口
|
||||
*/
|
||||
public interface NewMqttMessageRule {
|
||||
|
||||
/**
|
||||
* 获取命令类型
|
||||
* @return 命令类型
|
||||
*/
|
||||
String getCommandType();
|
||||
/**
|
||||
* 执行处理
|
||||
* @param context 处理上下文
|
||||
*/
|
||||
void execute(NewMqttRuleContext context);
|
||||
|
||||
/**
|
||||
* 获取优先级,数值越小优先级越高
|
||||
* @return 优先级
|
||||
*/
|
||||
default int getPriority() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package com.fuyuanshen.global.mqtt.base;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* MQTT消息处理上下文
|
||||
*/
|
||||
@Data
|
||||
public class NewMqttRuleContext {
|
||||
/**
|
||||
* 命令类型
|
||||
*/
|
||||
private String commandType;
|
||||
/**
|
||||
* 设备IMEI
|
||||
*/
|
||||
private String deviceImei;
|
||||
|
||||
/**
|
||||
* MQTT消息负载字典
|
||||
*/
|
||||
private Map<String, Object> payloadDict;
|
||||
}
|
||||
@ -0,0 +1,44 @@
|
||||
package com.fuyuanshen.global.mqtt.base;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* MQTT消息引擎
|
||||
*/
|
||||
@Component
|
||||
public class NewMqttRuleEngine {
|
||||
|
||||
// @Autowired
|
||||
// @Qualifier("threadPoolTaskExecutor")
|
||||
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
|
||||
private final LinkedHashMap<String, NewMqttMessageRule> rulesMap = new LinkedHashMap<>();
|
||||
public NewMqttRuleEngine(List<NewMqttMessageRule> rules) {
|
||||
// 按优先级排序
|
||||
rules.sort(Comparator.comparing(NewMqttMessageRule::getPriority));
|
||||
rules.forEach(rule -> rulesMap.put(rule.getCommandType(), rule)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行匹配
|
||||
* @param context 处理上下文
|
||||
* @return
|
||||
*/
|
||||
public boolean executeRule(NewMqttRuleContext context) {
|
||||
String commandType = context.getCommandType();
|
||||
// String funcType = context.getFuncType();
|
||||
NewMqttMessageRule mqttMessageRule = rulesMap.get(commandType);
|
||||
if (mqttMessageRule != null) {
|
||||
// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
||||
mqttMessageRule.execute(context);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -44,12 +44,12 @@ public class MqttInboundConfiguration {
|
||||
if (url == null) {
|
||||
throw new IllegalStateException("MQTT服务器URL未配置");
|
||||
}
|
||||
|
||||
String subTopic = mqttPropertiesConfig.getSubTopic();
|
||||
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||
url,
|
||||
clientId,
|
||||
mqttPahoClientFactory,
|
||||
mqttPropertiesConfig.getSubTopic().split(",")
|
||||
subTopic.split(",")
|
||||
);
|
||||
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
||||
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
||||
|
||||
@ -47,7 +47,7 @@ public class MqttOutboundConfiguration {
|
||||
mqttPahoClientFactory
|
||||
);
|
||||
mqttPahoMessageHandler.setDefaultQos(1);
|
||||
mqttPahoMessageHandler.setDefaultTopic("B/#");
|
||||
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic());
|
||||
mqttPahoMessageHandler.setAsync(true);
|
||||
return mqttPahoMessageHandler;
|
||||
}
|
||||
|
||||
@ -14,6 +14,8 @@ public class MqttPropertiesConfig {
|
||||
private String url;
|
||||
private String subClientId;
|
||||
private String subTopic;
|
||||
private String subTopic2;
|
||||
private String pubClientId;
|
||||
private String pubTopic;
|
||||
private String pubTopic2;
|
||||
}
|
||||
@ -0,0 +1,68 @@
|
||||
package com.fuyuanshen.global.mqtt.config;
|
||||
|
||||
|
||||
import cn.hutool.core.lang.UUID;
|
||||
import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
|
||||
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class NewMqttInboundConfiguration {
|
||||
@Autowired
|
||||
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||
@Autowired
|
||||
private MqttPahoClientFactory mqttPahoClientFactory;
|
||||
@Autowired
|
||||
private NewReceiverMessageHandler receiverMessageHandler2;
|
||||
//消息通道
|
||||
@Bean
|
||||
public MessageChannel messageInboundChannel2(){
|
||||
return new DirectChannel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置入站适配器
|
||||
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
|
||||
* */
|
||||
@Bean
|
||||
public MessageProducer messageProducer2(){
|
||||
// 生成一个不重复的随机数
|
||||
String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID();
|
||||
String subTopic = mqttPropertiesConfig.getSubTopic2();
|
||||
log.info("订阅主题:{}", subTopic);
|
||||
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||
mqttPropertiesConfig.getUrl(),
|
||||
clientId,
|
||||
mqttPahoClientFactory,
|
||||
subTopic.split(",")
|
||||
);
|
||||
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
||||
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
||||
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2());
|
||||
return mqttPahoMessageDrivenChannelAdapter;
|
||||
}
|
||||
/** 指定处理消息来自哪个通道 */
|
||||
@Bean
|
||||
@ServiceActivator(inputChannel = "messageInboundChannel2")
|
||||
public MessageHandler messageHandler2(){
|
||||
return receiverMessageHandler2;
|
||||
}
|
||||
|
||||
// @Bean
|
||||
// @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确
|
||||
// public MessageHandler deviceAlarmMessageHandler() {
|
||||
// return new DeviceAlrmMessageHandler();
|
||||
// }
|
||||
}
|
||||
@ -0,0 +1,46 @@
|
||||
package com.fuyuanshen.global.mqtt.config;
|
||||
|
||||
import cn.hutool.core.lang.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class NewMqttOutboundConfiguration {
|
||||
@Autowired
|
||||
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||
@Autowired
|
||||
private MqttPahoClientFactory mqttPahoClientFactory;
|
||||
|
||||
// 消息通道
|
||||
@Bean
|
||||
public MessageChannel mqttOutboundChannel2(){
|
||||
return new DirectChannel();
|
||||
}
|
||||
|
||||
|
||||
/** 配置出站消息处理器 */
|
||||
@Bean
|
||||
@ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理
|
||||
public MessageHandler mqttOutboundMessageHandler2(){
|
||||
String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID();
|
||||
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
|
||||
mqttPropertiesConfig.getUrl(),
|
||||
clientId,
|
||||
mqttPahoClientFactory
|
||||
);
|
||||
mqttPahoMessageHandler.setDefaultQos(1);
|
||||
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2());
|
||||
mqttPahoMessageHandler.setAsync(true);
|
||||
return mqttPahoMessageHandler;
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,20 +1,22 @@
|
||||
package com.fuyuanshen.global.mqtt.publish;
|
||||
|
||||
import cn.dev33.satoken.annotation.SaIgnore;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/")
|
||||
@RequestMapping("/api")
|
||||
@SaIgnore
|
||||
@Slf4j
|
||||
public class DeviceDataController {
|
||||
|
||||
@Autowired
|
||||
private MqttClientTest mqttClientTest;
|
||||
|
||||
// @PostMapping("/{deviceId}/command")
|
||||
@GetMapping("/command")
|
||||
public ResponseEntity<String> sendCommand() {
|
||||
|
||||
mqttClientTest.sendMsg();
|
||||
|
||||
@ -13,10 +13,10 @@ public class MqttClientTest {
|
||||
private MqttGateway mqttGateway;
|
||||
|
||||
public void sendMsg() {
|
||||
mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot");
|
||||
mqttGateway.sendMsgToMqtt("command/894078/HBY670/864865082081523", "hello mqtt spring boot");
|
||||
log.info("message is send");
|
||||
|
||||
mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2");
|
||||
mqttGateway.sendMsgToMqtt("report/894078/HBY670/864865082081523", "hello mqtt spring boot2");
|
||||
log.info("message is send2");
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,146 @@
|
||||
package com.fuyuanshen.global.mqtt.receiver;
|
||||
|
||||
import cn.hutool.core.lang.Dict;
|
||||
import com.baomidou.lock.LockTemplate;
|
||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class NewReceiverMessageHandler implements MessageHandler {
|
||||
|
||||
@Autowired
|
||||
private NewMqttRuleEngine newRuleEngine;
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
|
||||
Object payload = message.getPayload();
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
|
||||
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
|
||||
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
|
||||
|
||||
log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
|
||||
payload, receivedTopic, receivedQos, timestamp);
|
||||
|
||||
Dict payloadDict = JsonUtils.parseMap(payload.toString());
|
||||
if (receivedTopic == null || payloadDict == null) {
|
||||
return;
|
||||
}
|
||||
String imei = payloadDict.getStr("imei");
|
||||
String funcType = payloadDict.getStr("funcType");
|
||||
RedissonClient client = RedisUtils.getClient();
|
||||
String lockKey = "mqtt:consumer:lock:";
|
||||
String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei;
|
||||
log.info("MQTT2获取锁开始{}", KEY);
|
||||
RLock lock = client.getLock(KEY);
|
||||
|
||||
try {
|
||||
// 尝试获取锁,
|
||||
boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS);
|
||||
if (isLocked) {
|
||||
// 执行业务逻辑
|
||||
if(StringUtils.isNotBlank(imei)){
|
||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||
}
|
||||
|
||||
String[] topicArr = receivedTopic.split("/");
|
||||
|
||||
NewMqttRuleContext context = new NewMqttRuleContext();
|
||||
context.setCommandType(topicArr[2]+"_"+funcType);
|
||||
context.setDeviceImei(imei);
|
||||
context.setPayloadDict(payloadDict);
|
||||
|
||||
boolean ruleExecuted = newRuleEngine.executeRule(context);
|
||||
|
||||
if (!ruleExecuted) {
|
||||
log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
|
||||
}
|
||||
}else{
|
||||
log.warn("MQTT2获取锁失败,请稍后再试");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
// 释放锁
|
||||
if (lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class);
|
||||
// if (null == lockInfo) {
|
||||
// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei);
|
||||
// return;
|
||||
// }
|
||||
//// 获取锁成功,处理业务
|
||||
// try {
|
||||
// if(StringUtils.isNotBlank(imei)){
|
||||
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
|
||||
// //在线状态
|
||||
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||
// }
|
||||
//
|
||||
// String[] topicArr = receivedTopic.split("/");
|
||||
//
|
||||
// NewMqttRuleContext context = new NewMqttRuleContext();
|
||||
// context.setCommandType(topicArr[2]+"_"+funcType);
|
||||
// context.setDeviceImei(imei);
|
||||
// context.setPayloadDict(payloadDict);
|
||||
//
|
||||
// boolean ruleExecuted = newRuleEngine.executeRule(context);
|
||||
//
|
||||
// if (!ruleExecuted) {
|
||||
// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
|
||||
// }
|
||||
// } finally {
|
||||
// //释放锁
|
||||
// lockTemplate.releaseLock(lockInfo);
|
||||
// }
|
||||
|
||||
|
||||
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
|
||||
// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
|
||||
// if (intType > 0) {
|
||||
// MqttRuleContext newCtx = new MqttRuleContext();
|
||||
// String commandType = "Light_"+intType;
|
||||
// newCtx.setCommandType(commandType);
|
||||
// newCtx.setDeviceImei(imei);
|
||||
// newCtx.setPayloadDict(payloadDict);
|
||||
//
|
||||
// boolean ok = ruleEngine.executeRule(newCtx);
|
||||
// if (!ok) {
|
||||
// log.warn("新规则引擎未命中, imei={}", imei);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
@ -9,9 +9,12 @@ import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
||||
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
@ -21,6 +24,7 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||
|
||||
@ -38,55 +42,77 @@ public class ReceiverMessageHandler implements MessageHandler {
|
||||
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
|
||||
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
|
||||
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
|
||||
|
||||
|
||||
log.info("MQTT payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
|
||||
payload, receivedTopic, receivedQos, timestamp);
|
||||
|
||||
|
||||
Dict payloadDict = JsonUtils.parseMap(payload.toString());
|
||||
if (receivedTopic == null || payloadDict == null) {
|
||||
return;
|
||||
}
|
||||
String[] subStr = receivedTopic.split("/");
|
||||
String deviceImei = subStr[1];
|
||||
String state = payloadDict.getStr("state");
|
||||
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
||||
if(StringUtils.isNotBlank(deviceImei)){
|
||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofSeconds(900));
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (convertArr.length > 0) {
|
||||
Byte val1 = (Byte) convertArr[0];
|
||||
MqttRuleContext context = new MqttRuleContext();
|
||||
context.setCommandType(val1);
|
||||
context.setConvertArr(convertArr);
|
||||
context.setDeviceImei(deviceImei);
|
||||
context.setPayloadDict(payloadDict);
|
||||
RedissonClient client = RedisUtils.getClient();
|
||||
String lockKey = "mqtt:consumer:lock:";
|
||||
String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + deviceImei;
|
||||
RLock lock = client.getLock(KEY);
|
||||
|
||||
boolean ruleExecuted = ruleEngine.executeRule(context);
|
||||
|
||||
if (!ruleExecuted) {
|
||||
log.warn("未找到匹配的规则来处理命令类型: {}", val1);
|
||||
try {
|
||||
// 尝试获取锁,
|
||||
boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS);
|
||||
if (isLocked) {
|
||||
String state = payloadDict.getStr("state");
|
||||
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
||||
if(StringUtils.isNotBlank(deviceImei)){
|
||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||
RedisUtils.offerDeduplicated(queueKey,dedupKey,deviceImei, Duration.ofSeconds(900));
|
||||
//在线状态
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ deviceImei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (convertArr.length > 0) {
|
||||
Byte val1 = (Byte) convertArr[0];
|
||||
MqttRuleContext context = new MqttRuleContext();
|
||||
context.setCommandType(val1);
|
||||
context.setConvertArr(convertArr);
|
||||
context.setDeviceImei(deviceImei);
|
||||
context.setPayloadDict(payloadDict);
|
||||
|
||||
boolean ruleExecuted = ruleEngine.executeRule(context);
|
||||
|
||||
if (!ruleExecuted) {
|
||||
log.warn("未找到匹配的规则来处理命令类型: {}", val1);
|
||||
}
|
||||
}
|
||||
|
||||
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
|
||||
int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
|
||||
if (intType > 0) {
|
||||
MqttRuleContext newCtx = new MqttRuleContext();
|
||||
newCtx.setCommandType((byte) intType);
|
||||
newCtx.setDeviceImei(deviceImei);
|
||||
newCtx.setPayloadDict(payloadDict);
|
||||
|
||||
boolean ok = ruleEngine.executeRule(newCtx);
|
||||
if (!ok) {
|
||||
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
||||
}
|
||||
}
|
||||
}else{
|
||||
log.warn("MQTT获取锁失败,请稍后再试");
|
||||
}
|
||||
}
|
||||
|
||||
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
|
||||
int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
|
||||
if (intType > 0) {
|
||||
MqttRuleContext newCtx = new MqttRuleContext();
|
||||
newCtx.setCommandType((byte) intType);
|
||||
newCtx.setDeviceImei(deviceImei);
|
||||
newCtx.setPayloadDict(payloadDict);
|
||||
|
||||
boolean ok = ruleEngine.executeRule(newCtx);
|
||||
if (!ok) {
|
||||
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
// 释放锁
|
||||
if (lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,18 +38,18 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
||||
@Override
|
||||
public void execute(MqttRuleContext context) {
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
// Latitude, longitude
|
||||
//主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间
|
||||
String mainLightMode = convertArr[1].toString();
|
||||
String laserLightMode = convertArr[2].toString();
|
||||
String batteryPercentage = convertArr[3].toString();
|
||||
String chargeState = convertArr[4].toString();
|
||||
String batteryRemainingTime = convertArr[5].toString();
|
||||
|
||||
// 发送设备状态和位置信息到Redis
|
||||
asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode,
|
||||
batteryPercentage, chargeState, batteryRemainingTime);
|
||||
// Object[] convertArr = context.getConvertArr();
|
||||
// // Latitude, longitude
|
||||
// //主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间
|
||||
// String mainLightMode = convertArr[1].toString();
|
||||
// String laserLightMode = convertArr[2].toString();
|
||||
// String batteryPercentage = convertArr[3].toString();
|
||||
// String chargeState = convertArr[4].toString();
|
||||
// String batteryRemainingTime = convertArr[5].toString();
|
||||
//
|
||||
// // 发送设备状态和位置信息到Redis
|
||||
// asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode,
|
||||
// batteryPercentage, chargeState, batteryRemainingTime);
|
||||
} catch (Exception e) {
|
||||
log.error("处理上报数据命令时出错", e);
|
||||
}
|
||||
@ -67,31 +67,29 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
||||
*/
|
||||
public void asyncSendDeviceDataToRedisWithFuture(String deviceImei, String mainLightMode, String laserLightMode,
|
||||
String batteryPercentage, String chargeState, String batteryRemainingTime) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
// 构造设备状态信息对象
|
||||
Map<String, Object> deviceInfo = new LinkedHashMap<>();
|
||||
deviceInfo.put("deviceImei", deviceImei);
|
||||
deviceInfo.put("mainLightMode", mainLightMode);
|
||||
deviceInfo.put("laserLightMode", laserLightMode);
|
||||
deviceInfo.put("batteryPercentage", batteryPercentage);
|
||||
deviceInfo.put("chargeState", chargeState);
|
||||
deviceInfo.put("batteryRemainingTime", batteryRemainingTime);
|
||||
deviceInfo.put("timestamp", System.currentTimeMillis());
|
||||
try {
|
||||
// 构造设备状态信息对象
|
||||
Map<String, Object> deviceInfo = new LinkedHashMap<>();
|
||||
deviceInfo.put("deviceImei", deviceImei);
|
||||
deviceInfo.put("mainLightMode", mainLightMode);
|
||||
deviceInfo.put("laserLightMode", laserLightMode);
|
||||
deviceInfo.put("batteryPercentage", batteryPercentage);
|
||||
deviceInfo.put("chargeState", chargeState);
|
||||
deviceInfo.put("batteryRemainingTime", batteryRemainingTime);
|
||||
deviceInfo.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
// 将设备状态信息存储到Redis中
|
||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + deviceImei + DEVICE_STATUS_KEY_PREFIX;
|
||||
String deviceInfoJson = JsonUtils.toJsonString(deviceInfo);
|
||||
// 将设备状态信息存储到Redis中
|
||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + deviceImei + DEVICE_STATUS_KEY_PREFIX;
|
||||
String deviceInfoJson = JsonUtils.toJsonString(deviceInfo);
|
||||
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(deviceRedisKey, deviceInfoJson);
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(deviceRedisKey, deviceInfoJson);
|
||||
|
||||
log.info("设备状态信息已异步发送到Redis: device={}, mainLightMode={}, laserLightMode={}, batteryPercentage={}",
|
||||
deviceImei, mainLightMode, laserLightMode, batteryPercentage);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
log.info("设备状态信息已异步发送到Redis: device={}, mainLightMode={}, laserLightMode={}, batteryPercentage={}",
|
||||
deviceImei, mainLightMode, laserLightMode, batteryPercentage);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -44,37 +44,37 @@ public class BjqBootLogoRule implements MqttMessageRule {
|
||||
public void execute(MqttRuleContext context) {
|
||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||
try {
|
||||
Byte val2 = (Byte) context.getConvertArr()[1];
|
||||
if (val2 == 100) {
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
return;
|
||||
}
|
||||
|
||||
String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX);
|
||||
if (StringUtils.isEmpty(data)) {
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||
log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||
|
||||
ArrayList<Integer> intData = new ArrayList<>();
|
||||
intData.add(3);
|
||||
intData.add((int) val2);
|
||||
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("instruct", intData);
|
||||
|
||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}",
|
||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
JsonUtils.toJsonString(map));
|
||||
// Byte val2 = (Byte) context.getConvertArr()[1];
|
||||
// if (val2 == 100) {
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX);
|
||||
// if (StringUtils.isEmpty(data)) {
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||
// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||
// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||
//
|
||||
// ArrayList<Integer> intData = new ArrayList<>();
|
||||
// intData.add(3);
|
||||
// intData.add((int) val2);
|
||||
// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||
// intData.add(0);
|
||||
// intData.add(0);
|
||||
// intData.add(0);
|
||||
// intData.add(0);
|
||||
//
|
||||
// Map<String, Object> map = new HashMap<>();
|
||||
// map.put("instruct", intData);
|
||||
//
|
||||
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
// log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}",
|
||||
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
// JsonUtils.toJsonString(map));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理开机LOGO时出错", e);
|
||||
|
||||
@ -34,15 +34,15 @@ public class BjqLaserModeSettingsRule implements MqttMessageRule {
|
||||
public void execute(MqttRuleContext context) {
|
||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
|
||||
String mode = convertArr[1].toString();
|
||||
if(StringUtils.isNotBlank(mode)){
|
||||
// 发送设备状态和位置信息到Redis
|
||||
syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode);
|
||||
}
|
||||
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||
// Object[] convertArr = context.getConvertArr();
|
||||
//
|
||||
// String mode = convertArr[1].toString();
|
||||
// if(StringUtils.isNotBlank(mode)){
|
||||
// // 发送设备状态和位置信息到Redis
|
||||
// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode);
|
||||
// }
|
||||
//
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||
} catch (Exception e) {
|
||||
log.error("处理激光模式命令时出错", e);
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
||||
|
||||
@ -34,15 +34,15 @@ public class BjqLightBrightnessRule implements MqttMessageRule {
|
||||
public void execute(MqttRuleContext context) {
|
||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
|
||||
String convertValue = convertArr[1].toString();
|
||||
// 将设备状态信息存储到Redis中
|
||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(deviceRedisKey, convertValue);
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
// Object[] convertArr = context.getConvertArr();
|
||||
//
|
||||
// String convertValue = convertArr[1].toString();
|
||||
// // 将设备状态信息存储到Redis中
|
||||
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||
//
|
||||
// // 存储到Redis
|
||||
// RedisUtils.setCacheObject(deviceRedisKey, convertValue);
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
} catch (Exception e) {
|
||||
log.error("处理灯光亮度命令时出错", e);
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||
|
||||
@ -52,26 +52,26 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
||||
public void execute(MqttRuleContext context) {
|
||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
// Latitude, longitude
|
||||
String latitude = convertArr[1].toString();
|
||||
String longitude = convertArr[2].toString();
|
||||
// 判断 latitude 和 longitude 是否都为 0
|
||||
if ("0".equals(latitude) && "0".equals(longitude)) {
|
||||
log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude);
|
||||
return;
|
||||
}
|
||||
// 异步发送经纬度到Redis
|
||||
asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||
// 异步保存数据
|
||||
asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||
|
||||
Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
log.info("发送定位数据到设备=>topic:{},payload:{}",
|
||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
JsonUtils.toJsonString(map));
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
// Object[] convertArr = context.getConvertArr();
|
||||
// // Latitude, longitude
|
||||
// String latitude = convertArr[1].toString();
|
||||
// String longitude = convertArr[2].toString();
|
||||
// // 判断 latitude 和 longitude 是否都为 0
|
||||
// if ("0".equals(latitude) && "0".equals(longitude)) {
|
||||
// log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude);
|
||||
// return;
|
||||
// }
|
||||
// // 异步发送经纬度到Redis
|
||||
// asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||
// // 异步保存数据
|
||||
// asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||
//
|
||||
// Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
||||
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
// log.info("发送定位数据到设备=>topic:{},payload:{}",
|
||||
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
// JsonUtils.toJsonString(map));
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
} catch (Exception e) {
|
||||
log.error("处理定位数据命令时出错", e);
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||
@ -121,16 +121,15 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
||||
* @param longitude 经度
|
||||
*/
|
||||
public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// String[] latArr = latitude.split("\\.");
|
||||
// String[] lonArr = longitude.split("\\.");
|
||||
// // 将位置信息存储到Redis中
|
||||
String redisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + deviceImei + DEVICE_LOCATION_KEY_PREFIX;
|
||||
String redisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + deviceImei + DEVICE_LOCATION_KEY_PREFIX;
|
||||
// String redisObj = RedisUtils.getCacheObject(redisKey);
|
||||
// JSONObject jsonOBj = JSONObject.parseObject(redisObj);
|
||||
// if(jsonOBj != null){
|
||||
@ -150,34 +149,33 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
||||
// }
|
||||
// }
|
||||
|
||||
// 构造位置信息对象
|
||||
Map<String, Object> locationInfo = new LinkedHashMap<>();
|
||||
double[] doubles = LngLonUtil.gps84_To_Gcj02(Double.parseDouble(latitude), Double.parseDouble(longitude));
|
||||
locationInfo.put("deviceImei", deviceImei);
|
||||
locationInfo.put("latitude", doubles[0]);
|
||||
locationInfo.put("longitude", doubles[1]);
|
||||
locationInfo.put("wgs84_latitude", latitude);
|
||||
locationInfo.put("wgs84_longitude", longitude);
|
||||
String address = GetAddressFromLatUtil.getAdd(String.valueOf(doubles[1]), String.valueOf(doubles[0]));
|
||||
locationInfo.put("address", address);
|
||||
locationInfo.put("timestamp", System.currentTimeMillis());
|
||||
// 构造位置信息对象
|
||||
Map<String, Object> locationInfo = new LinkedHashMap<>();
|
||||
double[] doubles = LngLonUtil.gps84_To_Gcj02(Double.parseDouble(latitude), Double.parseDouble(longitude));
|
||||
locationInfo.put("deviceImei", deviceImei);
|
||||
locationInfo.put("latitude", doubles[0]);
|
||||
locationInfo.put("longitude", doubles[1]);
|
||||
locationInfo.put("wgs84_latitude", latitude);
|
||||
locationInfo.put("wgs84_longitude", longitude);
|
||||
String address = GetAddressFromLatUtil.getAdd(String.valueOf(doubles[1]), String.valueOf(doubles[0]));
|
||||
locationInfo.put("address", address);
|
||||
locationInfo.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
|
||||
String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||
String locationJson = JsonUtils.toJsonString(locationInfo);
|
||||
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(redisKey, locationJson);
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(redisKey, locationJson);
|
||||
|
||||
// 存储到一个列表中,保留历史位置信息
|
||||
// 存储到一个列表中,保留历史位置信息
|
||||
// String locationHistoryKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_LOCATION_HISTORY_KEY_PREFIX + deviceImei;
|
||||
// RedisUtils.addCacheList(locationHistoryKey, locationJson);
|
||||
// RedisUtils.expire(locationHistoryKey, Duration.ofDays(90));
|
||||
storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson);
|
||||
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
storeDeviceTrajectoryWithSortedSet(deviceImei, locationJson);
|
||||
log.info("位置信息已异步发送到Redis: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
} catch (Exception e) {
|
||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -189,20 +187,18 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
||||
* @param longitude 经度
|
||||
*/
|
||||
public void asyncSaveLocationToMySQLWithFuture(String deviceImei, String latitude, String longitude) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 调用服务层方法更新设备位置信息
|
||||
deviceService.updateDeviceLocationByImei(deviceImei, longitude, latitude);
|
||||
|
||||
log.info("位置信息已异步保存到MySQL: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
} catch (Exception e) {
|
||||
log.error("异步保存位置信息到MySQL时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
try {
|
||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
// 调用服务层方法更新设备位置信息
|
||||
deviceService.updateDeviceLocationByImei(deviceImei, longitude, latitude);
|
||||
|
||||
log.info("位置信息已异步保存到MySQL: device={}, lat={}, lon={}", deviceImei, latitude, longitude);
|
||||
} catch (Exception e) {
|
||||
log.error("异步保存位置信息到MySQL时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -50,45 +50,47 @@ public class BjqModeRule implements MqttMessageRule {
|
||||
public void execute(MqttRuleContext context) {
|
||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
|
||||
String mainLightMode = convertArr[1].toString();
|
||||
String batteryRemainingTime = convertArr[2].toString();
|
||||
if (StringUtils.isNotBlank(mainLightMode)) {
|
||||
log.info("设备离线mainLightMode:{}", mainLightMode);
|
||||
if ("0".equals(mainLightMode)) {
|
||||
|
||||
// 设备离线
|
||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0");
|
||||
|
||||
String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||
String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng);
|
||||
if ("1".equals(messageSendingValue)) {
|
||||
// 设置为故障状态
|
||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2");
|
||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("device_imei", context.getDeviceImei());
|
||||
updateWrapper.set("online_status", 2);
|
||||
deviceService.update(updateWrapper);
|
||||
RedisUtils.deleteObject(sendMessageIng);
|
||||
|
||||
// 解除告警
|
||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX;
|
||||
if (RedisUtils.getCacheObject(deviceRedisKey) != null) {
|
||||
RedisUtils.deleteObject(deviceRedisKey);
|
||||
}
|
||||
cancelAlarm(context.getDeviceImei());
|
||||
}
|
||||
}
|
||||
// 发送设备状态和位置信息到Redis
|
||||
syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode);
|
||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||
|
||||
// 存储到Redis
|
||||
RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime);
|
||||
}
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
log.info("处理灯光模式命令");
|
||||
log.info("MQTT消息负载字典:{}", context.getPayloadDict());
|
||||
// Object[] convertArr = context.getConvertArr();
|
||||
//
|
||||
// String mainLightMode = convertArr[1].toString();
|
||||
// String batteryRemainingTime = convertArr[2].toString();
|
||||
// if (StringUtils.isNotBlank(mainLightMode)) {
|
||||
// log.info("设备离线mainLightMode:{}", mainLightMode);
|
||||
// if ("0".equals(mainLightMode)) {
|
||||
//
|
||||
// // 设备离线
|
||||
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
||||
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0");
|
||||
//
|
||||
// String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||
// String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng);
|
||||
// if ("1".equals(messageSendingValue)) {
|
||||
// // 设置为故障状态
|
||||
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2");
|
||||
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
// updateWrapper.eq("device_imei", context.getDeviceImei());
|
||||
// updateWrapper.set("online_status", 2);
|
||||
// deviceService.update(updateWrapper);
|
||||
// RedisUtils.deleteObject(sendMessageIng);
|
||||
//
|
||||
// // 解除告警
|
||||
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX;
|
||||
// if (RedisUtils.getCacheObject(deviceRedisKey) != null) {
|
||||
// RedisUtils.deleteObject(deviceRedisKey);
|
||||
// }
|
||||
// cancelAlarm(context.getDeviceImei());
|
||||
// }
|
||||
// }
|
||||
// // 发送设备状态和位置信息到Redis
|
||||
// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode);
|
||||
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||
//
|
||||
// // 存储到Redis
|
||||
// RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime);
|
||||
// }
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
} catch (Exception e) {
|
||||
log.error("处理灯光模式命令时出错", e);
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||
|
||||
@ -34,8 +34,8 @@ public class BjqPersonnelInfoDataRule implements MqttMessageRule {
|
||||
public void execute(MqttRuleContext context) {
|
||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||
try {
|
||||
Object[] convertArr = context.getConvertArr();
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||
// Object[] convertArr = context.getConvertArr();
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||
} catch (Exception e) {
|
||||
log.error("处理定位数据命令时出错", e);
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
||||
|
||||
@ -45,47 +45,47 @@ public class BjqSendMessageRule implements MqttMessageRule {
|
||||
try {
|
||||
|
||||
// Byte val2 = (Byte) context.getConvertArr()[1];
|
||||
String val2Str = context.getConvertArr()[1].toString();
|
||||
int val2 = Integer.parseInt(val2Str);
|
||||
System.out.println("收到设备信息命令:"+val2);
|
||||
if (val2 == 100) {
|
||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
return;
|
||||
}
|
||||
|
||||
if(val2==200){
|
||||
String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||
RedisUtils.deleteObject(sendMessageIng);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data");
|
||||
if (StringUtils.isEmpty(data)) {
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||
log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||
// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
||||
|
||||
ArrayList<Integer> intData = new ArrayList<>();
|
||||
intData.add(6);
|
||||
intData.add(val2);
|
||||
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
intData.add(0);
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("instruct", intData);
|
||||
|
||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
log.info("发送设备信息数据到设备消息=>topic:{},payload:{}",
|
||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
JsonUtils.toJsonString(map));
|
||||
// String val2Str = context.getConvertArr()[1].toString();
|
||||
// int val2 = Integer.parseInt(val2Str);
|
||||
// System.out.println("收到设备信息命令:"+val2);
|
||||
// if (val2 == 100) {
|
||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// if(val2==200){
|
||||
// String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||
// RedisUtils.deleteObject(sendMessageIng);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
//
|
||||
// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data");
|
||||
// if (StringUtils.isEmpty(data)) {
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||
// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||
// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||
//// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
||||
//
|
||||
// ArrayList<Integer> intData = new ArrayList<>();
|
||||
// intData.add(6);
|
||||
// intData.add(val2);
|
||||
// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||
// intData.add(0);
|
||||
// intData.add(0);
|
||||
// intData.add(0);
|
||||
// intData.add(0);
|
||||
//
|
||||
// Map<String, Object> map = new HashMap<>();
|
||||
// map.put("instruct", intData);
|
||||
//
|
||||
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||
// log.info("发送设备信息数据到设备消息=>topic:{},payload:{}",
|
||||
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||
// JsonUtils.toJsonString(map));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理发送设备信息时出错", e);
|
||||
|
||||
@ -7,14 +7,18 @@ import com.fuyuanshen.common.core.utils.StringUtils;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.equipment.domain.Device;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -95,40 +99,18 @@ public class MqttMessageConsumer {
|
||||
}
|
||||
|
||||
// 处理具体业务逻辑的方法
|
||||
private void processMessage(String message) {
|
||||
private void processMessage(String imei) {
|
||||
String threadName = Thread.currentThread().getName();
|
||||
try {
|
||||
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
||||
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||
// String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
|
||||
// if(StringUtils.isBlank(deviceOnlineStatusRedis)){
|
||||
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
// updateWrapper.eq("device_imei", message)
|
||||
// .set("online_status", 1);
|
||||
// deviceMapper.update(updateWrapper);
|
||||
// }
|
||||
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||
// queryWrapper.eq("device_imei", message);
|
||||
// queryWrapper.eq("online_status", 1);
|
||||
// Long count = deviceMapper.selectCount(queryWrapper);
|
||||
// if(count == 0){
|
||||
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
// updateWrapper.eq("device_imei", message)
|
||||
// .eq("online_status", 0)
|
||||
// .set("online_status", 1);
|
||||
// deviceMapper.update(updateWrapper);
|
||||
// }
|
||||
// 执行业务逻辑
|
||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||
updateWrapper.eq("device_imei", message)
|
||||
updateWrapper.eq("device_imei", imei)
|
||||
.in("online_status", 0,2)
|
||||
.set("online_status", 1);
|
||||
int update = deviceMapper.update(updateWrapper);
|
||||
// 模拟业务处理耗时
|
||||
// Thread.sleep(200);
|
||||
|
||||
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e);
|
||||
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, imei, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -57,9 +57,11 @@ import com.fuyuanshen.web.domain.Dto.DeviceDebugLogoUploadDto;
|
||||
import com.fuyuanshen.web.domain.Dto.DeviceXinghanInstructDto;
|
||||
import com.fuyuanshen.web.domain.vo.DeviceXinghanDetailVo;
|
||||
import com.fuyuanshen.web.enums.AlarmTypeEnum;
|
||||
import com.fuyuanshen.web.util.AliyunVoiceUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
@ -93,6 +95,7 @@ public class DeviceXinghanBizService {
|
||||
private final DeviceAssignmentsService deviceAssignmentsService;
|
||||
@Autowired
|
||||
private ObjectMapper objectMapper;
|
||||
private final AliyunVoiceUtil voiceUtil;
|
||||
|
||||
/**
|
||||
* 所有档位的描述表
|
||||
@ -135,7 +138,6 @@ public class DeviceXinghanBizService {
|
||||
public void upSOSGradeSettings(DeviceXinghanInstructDto dto) {
|
||||
if(dto.getIsBluetooth()){
|
||||
long deviceId = dto.getDeviceId();
|
||||
|
||||
// 1. 使用Optional简化空值检查,使代码更简洁
|
||||
Device device = Optional.ofNullable(deviceMapper.selectById(deviceId))
|
||||
.orElseThrow(() -> new ServiceException("设备不存在"));
|
||||
@ -147,6 +149,24 @@ public class DeviceXinghanBizService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 触发异步报警
|
||||
* Spring 会自动调用 AsyncConfig.getAsyncExecutor() 来执行此方法
|
||||
*/
|
||||
@Async
|
||||
public void executeSosCall(String phone) {
|
||||
log.info("[SOS业务] 准备发起语音拨号 -> 目标: {}", phone);
|
||||
Map<String, String> params = Map.of("device", "670");
|
||||
String callId = voiceUtil.sendTtsSync(phone, "TTS_328730104", params);
|
||||
|
||||
if (callId != null) {
|
||||
log.info("[SOS业务] 拨号指令下发成功, callId: {}", callId);
|
||||
// 这里可以记录拨打日志到数据库
|
||||
} else {
|
||||
log.error("[SOS业务] 拨号指令下发失败,请检查配置或余额");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置强制报警
|
||||
*/
|
||||
@ -739,6 +759,7 @@ public class DeviceXinghanBizService {
|
||||
device.setCreateByName(loginUser.getNickname());
|
||||
device.setTypeName(deviceTypes.getTypeName());
|
||||
device.setDeviceType(deviceTypes.getId());
|
||||
device.setDevicePic(deviceTypes.getDevicePic());
|
||||
if (device.getDeviceImei() != null) {
|
||||
device.setPubTopic("A/" + device.getDeviceImei());
|
||||
device.setSubTopic("B/" + device.getDeviceImei());
|
||||
@ -767,4 +788,23 @@ public class DeviceXinghanBizService {
|
||||
return uuidStr.replaceAll("-", "");
|
||||
}
|
||||
|
||||
public Map<String, Object> GetDeviceByName(DeviceForm deviceForm){
|
||||
List<Map<String, Object>> list= deviceMapper.GetDeviceByName(deviceForm);
|
||||
Map<String, Object> device=null;
|
||||
if(list!=null && list.size()>0){
|
||||
device=list.get(0);
|
||||
}
|
||||
return device;
|
||||
}
|
||||
|
||||
public int getEquipCountByType(DeviceForm form){
|
||||
var res=deviceMapper.getEquipCountByType(form);
|
||||
return res;
|
||||
}
|
||||
|
||||
public List<Map<String,Object>> getEquipAllByType(DeviceForm deviceForm){
|
||||
List<Map<String, Object>> list= deviceMapper.getEquipAllByType(deviceForm);
|
||||
return list;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,87 @@
|
||||
package com.fuyuanshen.web.util;
|
||||
|
||||
import com.aliyun.dyvmsapi20170525.Client;
|
||||
import com.aliyun.dyvmsapi20170525.models.SingleCallByTtsRequest;
|
||||
import com.aliyun.dyvmsapi20170525.models.SingleCallByTtsResponse;
|
||||
import com.aliyun.teaopenapi.models.Config;
|
||||
import com.aliyun.teautil.models.RuntimeOptions;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class AliyunVoiceUtil {
|
||||
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Value("${alibaba.tts.akId}")
|
||||
private String akId;
|
||||
@Value("${alibaba.tts.akSecret}")
|
||||
private String akSecret;
|
||||
// @Value("${alibaba.tts.calledShowNumber:}")
|
||||
private String calledShowNumber;
|
||||
|
||||
// ========== 核心:单例客户端(类似 OkHttpClient) ==========
|
||||
private volatile Client client;
|
||||
|
||||
/**
|
||||
* 获取客户端(双重检查锁实现单例)
|
||||
* 只有在第一次调用时才会根据配置实例化,后续直接返回复用
|
||||
*/
|
||||
private Client getClient() throws Exception {
|
||||
if (client == null) {
|
||||
synchronized (this) {
|
||||
if (client == null) {
|
||||
log.info("[AliyunVoice] 正在初始化阿里云语音客户端...");
|
||||
Config config = new Config()
|
||||
.setAccessKeyId(akId)
|
||||
.setAccessKeySecret(akSecret)
|
||||
.setEndpoint("dyvmsapi.aliyuncs.com");
|
||||
this.client = new Client(config);
|
||||
}
|
||||
}
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步发送方法:由异步架构调用
|
||||
*/
|
||||
public String sendTtsSync(String phone, String templateCode, Map<String, String> params) {
|
||||
|
||||
try {
|
||||
// 1. 获取(或初始化)单例客户端
|
||||
Client voiceClient = getClient();
|
||||
|
||||
SingleCallByTtsRequest request = new SingleCallByTtsRequest()
|
||||
.setCalledNumber(phone)
|
||||
.setTtsCode(templateCode)
|
||||
.setTtsParam(objectMapper.writeValueAsString(params));
|
||||
|
||||
if (StringUtils.hasText(calledShowNumber)) {
|
||||
request.setCalledShowNumber(calledShowNumber);
|
||||
}
|
||||
|
||||
// 生产级超时配置
|
||||
RuntimeOptions runtime = new RuntimeOptions();
|
||||
runtime.setConnectTimeout(5000);
|
||||
runtime.setReadTimeout(10000);
|
||||
|
||||
SingleCallByTtsResponse response = voiceClient.singleCallByTtsWithOptions(request, runtime);
|
||||
|
||||
if ("OK".equalsIgnoreCase(response.getBody().getCode())) {
|
||||
return response.getBody().getCallId();
|
||||
} else {
|
||||
log.error("[AliyunVoice] 拨号失败: {}", response.getBody().getMessage());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[AliyunVoice] 接口异常", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -8,8 +8,8 @@ spring.boot.admin.client:
|
||||
metadata:
|
||||
username: ${spring.boot.admin.client.username}
|
||||
userpassword: ${spring.boot.admin.client.password}
|
||||
username: ${monitor.username}
|
||||
password: ${monitor.password}
|
||||
username: @monitor.username@
|
||||
password: @monitor.password@
|
||||
|
||||
--- # snail-job 配置
|
||||
snail-job:
|
||||
|
||||
@ -11,8 +11,8 @@ spring.boot.admin.client:
|
||||
metadata:
|
||||
username: ${spring.boot.admin.client.username}
|
||||
userpassword: ${spring.boot.admin.client.password}
|
||||
username: ${monitor.username}
|
||||
password: ${monitor.password}
|
||||
username: @monitor.username@
|
||||
password: @monitor.password@
|
||||
|
||||
--- # snail-job 配置
|
||||
snail-job:
|
||||
|
||||
@ -140,6 +140,18 @@
|
||||
<version>3.3.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 电话语音通知 -->
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>dyvmsapi20170525</artifactId>
|
||||
<version>4.2.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aliyun</groupId>
|
||||
<artifactId>tea-openapi</artifactId>
|
||||
<version>0.3.2</version>
|
||||
</dependency>
|
||||
|
||||
<!-- fastjson2 -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.fastjson2</groupId>
|
||||
|
||||
@ -15,6 +15,7 @@ import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -50,7 +51,7 @@ public class DeviceTypeController {
|
||||
// @Log("新增设备类型")
|
||||
@Operation(summary = "新增设备类型")
|
||||
@PostMapping(value = "/add")
|
||||
public R<Void> createDeviceType(@Validated @RequestBody DeviceType resources) {
|
||||
public R<Void> createDeviceType(@Validated @ModelAttribute DeviceTypeForm resources) throws IOException {
|
||||
deviceTypeService.create(resources);
|
||||
return R.ok();
|
||||
}
|
||||
@ -59,7 +60,7 @@ public class DeviceTypeController {
|
||||
// @Log("修改设备类型")
|
||||
@Operation(summary = "修改设备类型")
|
||||
@PutMapping(value = "/update")
|
||||
public R<Void> updateDeviceType(@Validated @RequestBody DeviceTypeForm resources) {
|
||||
public R<Void> updateDeviceType(@Validated @ModelAttribute DeviceTypeForm resources) throws IOException {
|
||||
deviceTypeService.update(resources);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
@ -85,5 +85,8 @@ public class DeviceType extends TenantEntity {
|
||||
@Schema(title = "型号字典用于PC页面跳转")
|
||||
private String pcModelDictionary;
|
||||
|
||||
@Schema(title = "设备图片")
|
||||
private String devicePic;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -54,6 +54,9 @@ public class DeviceForm {
|
||||
@Schema(title = "备注")
|
||||
private String remark;
|
||||
|
||||
@Schema(title = "商户号")
|
||||
private Long tenant_id;
|
||||
|
||||
|
||||
// 设备类型相关字段
|
||||
@Schema(title = "设备类型名称")
|
||||
|
||||
@ -2,6 +2,7 @@ package com.fuyuanshen.equipment.domain.form;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
/**
|
||||
* @Description: 设备类型
|
||||
@ -48,5 +49,10 @@ public class DeviceTypeForm {
|
||||
*/
|
||||
@Schema(title = "型号字典用于PC页面跳转")
|
||||
private String pcModelDictionary;
|
||||
@Schema(title = "设备图片")
|
||||
private String devicePic;
|
||||
|
||||
@Schema(title = "设备图片")
|
||||
private MultipartFile file;
|
||||
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@ import com.fuyuanshen.equipment.domain.DeviceType;
|
||||
import com.fuyuanshen.equipment.domain.dto.DeviceExcelImportDTO;
|
||||
import com.fuyuanshen.equipment.domain.dto.ImportResult;
|
||||
import com.fuyuanshen.equipment.domain.form.DeviceForm;
|
||||
import com.fuyuanshen.equipment.domain.form.DeviceTypeForm;
|
||||
import com.fuyuanshen.equipment.handler.ImageWriteHandler;
|
||||
import com.fuyuanshen.system.domain.vo.SysOssVo;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -437,8 +438,10 @@ public class UploadDeviceDataListener implements ReadListener<DeviceExcelImportD
|
||||
newDeviceType.setAppModelDictionary(originalDto.getAppModelDictionary());
|
||||
newDeviceType.setPcModelDictionary(originalDto.getPcModelDictionary());
|
||||
|
||||
DeviceTypeForm deviceTypeForm = new DeviceTypeForm();
|
||||
BeanUtil.copyProperties(newDeviceType, deviceTypeForm, true);
|
||||
// 创建新的设备类型
|
||||
params.getDeviceTypeService().create(newDeviceType);
|
||||
params.getDeviceTypeService().create(deviceTypeForm);
|
||||
|
||||
// 重新查询确保获取到正确的ID
|
||||
deviceType = params.getDeviceTypeService().queryByName(device.getTypeName());
|
||||
|
||||
@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.fuyuanshen.equipment.domain.Device;
|
||||
import com.fuyuanshen.equipment.domain.dto.InstructionRecordDto;
|
||||
import com.fuyuanshen.equipment.domain.form.DeviceForm;
|
||||
import com.fuyuanshen.equipment.domain.query.DeviceQueryCriteria;
|
||||
import com.fuyuanshen.equipment.domain.vo.*;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
@ -148,4 +149,10 @@ public interface DeviceMapper extends BaseMapper<Device> {
|
||||
*/
|
||||
int countByDeviceTypeId(@Param("deviceTypeId") Long deviceTypeId);
|
||||
|
||||
List<Map<String, Object>> GetDeviceByName(DeviceForm deviceForm);
|
||||
|
||||
int getEquipCountByType(DeviceForm deviceForm);
|
||||
|
||||
List<Map<String,Object>> getEquipAllByType(DeviceForm deviceForm);
|
||||
|
||||
}
|
||||
@ -8,6 +8,7 @@ import com.fuyuanshen.equipment.domain.DeviceType;
|
||||
import com.fuyuanshen.equipment.domain.form.DeviceTypeForm;
|
||||
import com.fuyuanshen.equipment.domain.query.DeviceTypeQueryCriteria;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -63,14 +64,14 @@ public interface DeviceTypeService extends IService<DeviceType> {
|
||||
*
|
||||
* @param resources /
|
||||
*/
|
||||
void create(DeviceType resources);
|
||||
void create(DeviceTypeForm resources) throws IOException;
|
||||
|
||||
/**
|
||||
* 修改设备类型
|
||||
*
|
||||
* @param resources /
|
||||
*/
|
||||
void update(DeviceTypeForm resources);
|
||||
void update(DeviceTypeForm resources) throws IOException;
|
||||
|
||||
/**
|
||||
* 多选删除
|
||||
|
||||
@ -325,6 +325,7 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> impleme
|
||||
device.setCreateByName(loginUser.getNickname());
|
||||
device.setTypeName(deviceType.getTypeName());
|
||||
device.setDeviceType(deviceType.getId());
|
||||
device.setDevicePic(deviceType.getDevicePic());
|
||||
if (device.getDeviceImei() != null) {
|
||||
device.setPubTopic("A/" + device.getDeviceImei());
|
||||
device.setSubTopic("B/" + device.getDeviceImei());
|
||||
|
||||
@ -19,12 +19,16 @@ import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceTypeGrantsMapper;
|
||||
import com.fuyuanshen.equipment.mapper.DeviceTypeMapper;
|
||||
import com.fuyuanshen.equipment.service.DeviceTypeService;
|
||||
import com.fuyuanshen.equipment.utils.FileHashUtil;
|
||||
import com.fuyuanshen.system.domain.vo.SysOssVo;
|
||||
import com.fuyuanshen.system.domain.vo.SysRoleVo;
|
||||
import com.fuyuanshen.system.service.ISysOssService;
|
||||
import com.fuyuanshen.system.service.ISysRoleService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
@ -46,6 +50,8 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
||||
private final DeviceAssignmentsMapper deviceAssignmentsMapper;
|
||||
|
||||
private final ISysRoleService roleService;
|
||||
private final ISysOssService ossService;
|
||||
private final FileHashUtil fileHashUtil;
|
||||
|
||||
|
||||
/**
|
||||
@ -181,24 +187,38 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
||||
*/
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void create(DeviceType resources) {
|
||||
public void create(DeviceTypeForm resources) throws IOException {
|
||||
|
||||
// 校验设备类型名称
|
||||
List<DeviceType> typeName = deviceTypeMapper.selectList(new QueryWrapper<DeviceType>().eq("type_name", resources.getTypeName()));
|
||||
if (CollectionUtil.isNotEmpty(typeName)) {
|
||||
throw new RuntimeException("设备类型名称已存在,无法新增!!!");
|
||||
}
|
||||
// 保存图片并获取URL
|
||||
if (resources.getFile() != null) {
|
||||
String fileHash = fileHashUtil.hash(resources.getFile());
|
||||
SysOssVo upload = ossService.updateHash(resources.getFile(), fileHash);
|
||||
// 强制将HTTP替换为HTTPS
|
||||
if (upload.getUrl() != null && upload.getUrl().startsWith("http://")) {
|
||||
upload.setUrl(upload.getUrl().replaceFirst("^http://", "https://"));
|
||||
}
|
||||
// 设置图片路径
|
||||
resources.setDevicePic(upload.getUrl());
|
||||
}
|
||||
|
||||
DeviceType deviceType = new DeviceType();
|
||||
BeanUtil.copyProperties(resources, deviceType, true);
|
||||
|
||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||
resources.setCustomerId(loginUser.getUserId());
|
||||
resources.setOwnerCustomerId(loginUser.getUserId());
|
||||
resources.setOriginalOwnerId(loginUser.getUserId());
|
||||
resources.setCreateByName(loginUser.getNickname());
|
||||
deviceTypeMapper.insert(resources);
|
||||
deviceType.setCustomerId(loginUser.getUserId());
|
||||
deviceType.setOwnerCustomerId(loginUser.getUserId());
|
||||
deviceType.setOriginalOwnerId(loginUser.getUserId());
|
||||
deviceType.setCreateByName(loginUser.getNickname());
|
||||
deviceTypeMapper.insert(deviceType);
|
||||
|
||||
// 自动授权给自己
|
||||
DeviceTypeGrants deviceTypeGrants = new DeviceTypeGrants();
|
||||
deviceTypeGrants.setDeviceTypeId(resources.getId());
|
||||
deviceTypeGrants.setDeviceTypeId(deviceType.getId());
|
||||
deviceTypeGrants.setCustomerId(loginUser.getUserId());
|
||||
deviceTypeGrants.setGrantorCustomerId(loginUser.getUserId());
|
||||
deviceTypeGrants.setGrantedAt(new Date());
|
||||
@ -213,7 +233,7 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
||||
*/
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void update(DeviceTypeForm resources) {
|
||||
public void update(DeviceTypeForm resources) throws IOException {
|
||||
DeviceTypeGrants deviceTypeGrants = deviceTypeGrantsMapper.selectById(resources.getId());
|
||||
if (deviceTypeGrants == null) {
|
||||
throw new RuntimeException("设备类型不存在");
|
||||
@ -244,6 +264,17 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
||||
throw new RuntimeException("无权修改该设备类型");
|
||||
}
|
||||
}
|
||||
// 保存图片并获取URL
|
||||
if (resources.getFile() != null) {
|
||||
String fileHash = fileHashUtil.hash(resources.getFile());
|
||||
SysOssVo upload = ossService.updateHash(resources.getFile(), fileHash);
|
||||
// 强制将HTTP替换为HTTPS
|
||||
if (upload.getUrl() != null && upload.getUrl().startsWith("http://")) {
|
||||
upload.setUrl(upload.getUrl().replaceFirst("^http://", "https://"));
|
||||
}
|
||||
// 设置图片路径
|
||||
resources.setDevicePic(upload.getUrl());
|
||||
}
|
||||
|
||||
BeanUtil.copyProperties(resources, deviceType);
|
||||
deviceTypeMapper.updateById(deviceType);
|
||||
|
||||
@ -264,6 +264,44 @@
|
||||
</if>
|
||||
</select>
|
||||
|
||||
<select id="GetDeviceByName" resultType="map" >
|
||||
select a.id,
|
||||
a.device_type,
|
||||
a.device_name,
|
||||
a.device_mac,
|
||||
a.type_name,
|
||||
a.bluetooth_name,
|
||||
a.device_imei
|
||||
from device a
|
||||
<where>
|
||||
a.tenant_id = #{tenant_id} and
|
||||
a.device_type = #{deviceType}
|
||||
AND (
|
||||
a.device_name=#{deviceName} or
|
||||
a.bluetooth_name=#{deviceName}
|
||||
)
|
||||
</where>
|
||||
</select>
|
||||
|
||||
<select id="getEquipCountByType" resultType="int">
|
||||
select count(1) cnt
|
||||
from device a
|
||||
<where>
|
||||
a.tenant_id = #{tenant_id} and
|
||||
a.device_type = #{deviceType}
|
||||
</where>
|
||||
</select>
|
||||
|
||||
<select id="getEquipAllByType" resultType="map">
|
||||
select device_mac,bluetooth_name,device_name
|
||||
from device a
|
||||
<where>
|
||||
a.tenant_id = #{tenant_id} and
|
||||
a.device_type = #{deviceType}
|
||||
</where>
|
||||
|
||||
</select>
|
||||
|
||||
<!-- 获取分配设备的客户 -->
|
||||
<select id="getAssignCustomer" resultType="com.fuyuanshen.equipment.domain.Device">
|
||||
SELECT *
|
||||
|
||||
16
pom.xml
16
pom.xml
@ -85,10 +85,10 @@
|
||||
<monitor.username>fys</monitor.username>
|
||||
<monitor.password>123456</monitor.password>
|
||||
</properties>
|
||||
<!-- <activation> -->
|
||||
<!-- <!– 默认环境 –> -->
|
||||
<!-- <activeByDefault>true</activeByDefault> -->
|
||||
<!-- </activation> -->
|
||||
<activation>
|
||||
<!-- 默认环境 -->
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>prod</id>
|
||||
@ -98,10 +98,10 @@
|
||||
<monitor.username>fys</monitor.username>
|
||||
<monitor.password>123456</monitor.password>
|
||||
</properties>
|
||||
<activation>
|
||||
<!-- 默认环境 -->
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<!-- <activation> -->
|
||||
<!-- <!– 默认环境 –> -->
|
||||
<!-- <activeByDefault>true</activeByDefault> -->
|
||||
<!-- </activation> -->
|
||||
</profile>
|
||||
<profile>
|
||||
<id>jingquan</id>
|
||||
|
||||
Reference in New Issue
Block a user