Compare commits
23 Commits
437ab110b7
...
prod
| Author | SHA1 | Date | |
|---|---|---|---|
| cde34cab6c | |||
| 7d256df790 | |||
| 3d1c2f4e56 | |||
| ccadcb8d4e | |||
| b8cb663bbf | |||
| 56e86b070d | |||
| 0898855108 | |||
| 92b65ce6df | |||
| 82399cffed | |||
| 5538ac96e5 | |||
| b703f80355 | |||
| b3b249ea07 | |||
| aff424e73b | |||
| dc513a858a | |||
| df5ce7ddd9 | |||
| 2174dfdb4d | |||
| 2800b89e06 | |||
| 637e46c510 | |||
| 31c2158c8e | |||
| c7c21dc358 | |||
| a7e0803b00 | |||
| 55cacbd322 | |||
| 99ec6eaff0 |
@ -1,5 +1,9 @@
|
|||||||
package com.fuyuanshen.global.mqtt.base;
|
package com.fuyuanshen.global.mqtt.base;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.core.task.TaskExecutor;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -12,9 +16,9 @@ import java.util.List;
|
|||||||
@Component
|
@Component
|
||||||
public class MqttRuleEngine {
|
public class MqttRuleEngine {
|
||||||
|
|
||||||
// @Autowired
|
@Autowired
|
||||||
// @Qualifier("threadPoolTaskExecutor")
|
@Qualifier("threadPoolTaskExecutor")
|
||||||
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||||
|
|
||||||
private final LinkedHashMap<String, MqttMessageRule> rulesMap = new LinkedHashMap<>();
|
private final LinkedHashMap<String, MqttMessageRule> rulesMap = new LinkedHashMap<>();
|
||||||
|
|
||||||
@ -37,8 +41,7 @@ public class MqttRuleEngine {
|
|||||||
int commandType = context.getCommandType();
|
int commandType = context.getCommandType();
|
||||||
MqttMessageRule mqttMessageRule = rulesMap.get("Light_" + commandType);
|
MqttMessageRule mqttMessageRule = rulesMap.get("Light_" + commandType);
|
||||||
if (mqttMessageRule != null) {
|
if (mqttMessageRule != null) {
|
||||||
// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
||||||
mqttMessageRule.execute(context);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,26 +0,0 @@
|
|||||||
package com.fuyuanshen.global.mqtt.base;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT消息处理接口
|
|
||||||
*/
|
|
||||||
public interface NewMqttMessageRule {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取命令类型
|
|
||||||
* @return 命令类型
|
|
||||||
*/
|
|
||||||
String getCommandType();
|
|
||||||
/**
|
|
||||||
* 执行处理
|
|
||||||
* @param context 处理上下文
|
|
||||||
*/
|
|
||||||
void execute(NewMqttRuleContext context);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取优先级,数值越小优先级越高
|
|
||||||
* @return 优先级
|
|
||||||
*/
|
|
||||||
default int getPriority() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,25 +0,0 @@
|
|||||||
package com.fuyuanshen.global.mqtt.base;
|
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT消息处理上下文
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class NewMqttRuleContext {
|
|
||||||
/**
|
|
||||||
* 命令类型
|
|
||||||
*/
|
|
||||||
private String commandType;
|
|
||||||
/**
|
|
||||||
* 设备IMEI
|
|
||||||
*/
|
|
||||||
private String deviceImei;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT消息负载字典
|
|
||||||
*/
|
|
||||||
private Map<String, Object> payloadDict;
|
|
||||||
}
|
|
||||||
@ -1,44 +0,0 @@
|
|||||||
package com.fuyuanshen.global.mqtt.base;
|
|
||||||
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT消息引擎
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
public class NewMqttRuleEngine {
|
|
||||||
|
|
||||||
// @Autowired
|
|
||||||
// @Qualifier("threadPoolTaskExecutor")
|
|
||||||
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
|
||||||
|
|
||||||
private final LinkedHashMap<String, NewMqttMessageRule> rulesMap = new LinkedHashMap<>();
|
|
||||||
public NewMqttRuleEngine(List<NewMqttMessageRule> rules) {
|
|
||||||
// 按优先级排序
|
|
||||||
rules.sort(Comparator.comparing(NewMqttMessageRule::getPriority));
|
|
||||||
rules.forEach(rule -> rulesMap.put(rule.getCommandType(), rule)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 执行匹配
|
|
||||||
* @param context 处理上下文
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public boolean executeRule(NewMqttRuleContext context) {
|
|
||||||
String commandType = context.getCommandType();
|
|
||||||
// String funcType = context.getFuncType();
|
|
||||||
NewMqttMessageRule mqttMessageRule = rulesMap.get(commandType);
|
|
||||||
if (mqttMessageRule != null) {
|
|
||||||
// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
|
||||||
mqttMessageRule.execute(context);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -44,12 +44,12 @@ public class MqttInboundConfiguration {
|
|||||||
if (url == null) {
|
if (url == null) {
|
||||||
throw new IllegalStateException("MQTT服务器URL未配置");
|
throw new IllegalStateException("MQTT服务器URL未配置");
|
||||||
}
|
}
|
||||||
String subTopic = mqttPropertiesConfig.getSubTopic();
|
|
||||||
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||||
url,
|
url,
|
||||||
clientId,
|
clientId,
|
||||||
mqttPahoClientFactory,
|
mqttPahoClientFactory,
|
||||||
subTopic.split(",")
|
mqttPropertiesConfig.getSubTopic().split(",")
|
||||||
);
|
);
|
||||||
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
||||||
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
||||||
|
|||||||
@ -47,7 +47,7 @@ public class MqttOutboundConfiguration {
|
|||||||
mqttPahoClientFactory
|
mqttPahoClientFactory
|
||||||
);
|
);
|
||||||
mqttPahoMessageHandler.setDefaultQos(1);
|
mqttPahoMessageHandler.setDefaultQos(1);
|
||||||
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic());
|
mqttPahoMessageHandler.setDefaultTopic("B/#");
|
||||||
mqttPahoMessageHandler.setAsync(true);
|
mqttPahoMessageHandler.setAsync(true);
|
||||||
return mqttPahoMessageHandler;
|
return mqttPahoMessageHandler;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,8 +14,6 @@ public class MqttPropertiesConfig {
|
|||||||
private String url;
|
private String url;
|
||||||
private String subClientId;
|
private String subClientId;
|
||||||
private String subTopic;
|
private String subTopic;
|
||||||
private String subTopic2;
|
|
||||||
private String pubClientId;
|
private String pubClientId;
|
||||||
private String pubTopic;
|
private String pubTopic;
|
||||||
private String pubTopic2;
|
|
||||||
}
|
}
|
||||||
@ -1,68 +0,0 @@
|
|||||||
package com.fuyuanshen.global.mqtt.config;
|
|
||||||
|
|
||||||
|
|
||||||
import cn.hutool.core.lang.UUID;
|
|
||||||
import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.integration.annotation.ServiceActivator;
|
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
|
||||||
import org.springframework.integration.core.MessageProducer;
|
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
||||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
|
||||||
import org.springframework.messaging.MessageChannel;
|
|
||||||
import org.springframework.messaging.MessageHandler;
|
|
||||||
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@Slf4j
|
|
||||||
public class NewMqttInboundConfiguration {
|
|
||||||
@Autowired
|
|
||||||
private MqttPropertiesConfig mqttPropertiesConfig;
|
|
||||||
@Autowired
|
|
||||||
private MqttPahoClientFactory mqttPahoClientFactory;
|
|
||||||
@Autowired
|
|
||||||
private NewReceiverMessageHandler receiverMessageHandler2;
|
|
||||||
//消息通道
|
|
||||||
@Bean
|
|
||||||
public MessageChannel messageInboundChannel2(){
|
|
||||||
return new DirectChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 配置入站适配器
|
|
||||||
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
|
|
||||||
* */
|
|
||||||
@Bean
|
|
||||||
public MessageProducer messageProducer2(){
|
|
||||||
// 生成一个不重复的随机数
|
|
||||||
String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID();
|
|
||||||
String subTopic = mqttPropertiesConfig.getSubTopic2();
|
|
||||||
log.info("订阅主题:{}", subTopic);
|
|
||||||
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
|
||||||
mqttPropertiesConfig.getUrl(),
|
|
||||||
clientId,
|
|
||||||
mqttPahoClientFactory,
|
|
||||||
subTopic.split(",")
|
|
||||||
);
|
|
||||||
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
|
||||||
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
|
||||||
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2());
|
|
||||||
return mqttPahoMessageDrivenChannelAdapter;
|
|
||||||
}
|
|
||||||
/** 指定处理消息来自哪个通道 */
|
|
||||||
@Bean
|
|
||||||
@ServiceActivator(inputChannel = "messageInboundChannel2")
|
|
||||||
public MessageHandler messageHandler2(){
|
|
||||||
return receiverMessageHandler2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Bean
|
|
||||||
// @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确
|
|
||||||
// public MessageHandler deviceAlarmMessageHandler() {
|
|
||||||
// return new DeviceAlrmMessageHandler();
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
@ -1,46 +0,0 @@
|
|||||||
package com.fuyuanshen.global.mqtt.config;
|
|
||||||
|
|
||||||
import cn.hutool.core.lang.UUID;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.integration.annotation.ServiceActivator;
|
|
||||||
import org.springframework.integration.channel.DirectChannel;
|
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
|
||||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
||||||
import org.springframework.messaging.MessageChannel;
|
|
||||||
import org.springframework.messaging.MessageHandler;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
@Slf4j
|
|
||||||
public class NewMqttOutboundConfiguration {
|
|
||||||
@Autowired
|
|
||||||
private MqttPropertiesConfig mqttPropertiesConfig;
|
|
||||||
@Autowired
|
|
||||||
private MqttPahoClientFactory mqttPahoClientFactory;
|
|
||||||
|
|
||||||
// 消息通道
|
|
||||||
@Bean
|
|
||||||
public MessageChannel mqttOutboundChannel2(){
|
|
||||||
return new DirectChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/** 配置出站消息处理器 */
|
|
||||||
@Bean
|
|
||||||
@ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理
|
|
||||||
public MessageHandler mqttOutboundMessageHandler2(){
|
|
||||||
String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID();
|
|
||||||
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
|
|
||||||
mqttPropertiesConfig.getUrl(),
|
|
||||||
clientId,
|
|
||||||
mqttPahoClientFactory
|
|
||||||
);
|
|
||||||
mqttPahoMessageHandler.setDefaultQos(1);
|
|
||||||
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2());
|
|
||||||
mqttPahoMessageHandler.setAsync(true);
|
|
||||||
return mqttPahoMessageHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,22 +1,20 @@
|
|||||||
package com.fuyuanshen.global.mqtt.publish;
|
package com.fuyuanshen.global.mqtt.publish;
|
||||||
|
|
||||||
import cn.dev33.satoken.annotation.SaIgnore;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/api")
|
@RequestMapping("/api/")
|
||||||
@SaIgnore
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DeviceDataController {
|
public class DeviceDataController {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private MqttClientTest mqttClientTest;
|
private MqttClientTest mqttClientTest;
|
||||||
@GetMapping("/command")
|
|
||||||
|
// @PostMapping("/{deviceId}/command")
|
||||||
public ResponseEntity<String> sendCommand() {
|
public ResponseEntity<String> sendCommand() {
|
||||||
|
|
||||||
mqttClientTest.sendMsg();
|
mqttClientTest.sendMsg();
|
||||||
|
|||||||
@ -13,10 +13,10 @@ public class MqttClientTest {
|
|||||||
private MqttGateway mqttGateway;
|
private MqttGateway mqttGateway;
|
||||||
|
|
||||||
public void sendMsg() {
|
public void sendMsg() {
|
||||||
mqttGateway.sendMsgToMqtt("command/894078/HBY670/864865082081523", "hello mqtt spring boot");
|
mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot");
|
||||||
log.info("message is send");
|
log.info("message is send");
|
||||||
|
|
||||||
mqttGateway.sendMsgToMqtt("report/894078/HBY670/864865082081523", "hello mqtt spring boot2");
|
mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2");
|
||||||
log.info("message is send2");
|
log.info("message is send2");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,146 +0,0 @@
|
|||||||
package com.fuyuanshen.global.mqtt.receiver;
|
|
||||||
|
|
||||||
import cn.hutool.core.lang.Dict;
|
|
||||||
import com.baomidou.lock.LockTemplate;
|
|
||||||
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
|
||||||
import com.fuyuanshen.common.core.utils.StringUtils;
|
|
||||||
import com.fuyuanshen.common.json.utils.JsonUtils;
|
|
||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
|
||||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
|
||||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine;
|
|
||||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
|
||||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.redisson.api.RLock;
|
|
||||||
import org.redisson.api.RedissonClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
import org.springframework.messaging.MessageHandler;
|
|
||||||
import org.springframework.messaging.MessageHeaders;
|
|
||||||
import org.springframework.messaging.MessagingException;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class NewReceiverMessageHandler implements MessageHandler {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private NewMqttRuleEngine newRuleEngine;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleMessage(Message<?> message) throws MessagingException {
|
|
||||||
|
|
||||||
Object payload = message.getPayload();
|
|
||||||
MessageHeaders headers = message.getHeaders();
|
|
||||||
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
|
|
||||||
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
|
|
||||||
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
|
|
||||||
|
|
||||||
log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
|
|
||||||
payload, receivedTopic, receivedQos, timestamp);
|
|
||||||
|
|
||||||
Dict payloadDict = JsonUtils.parseMap(payload.toString());
|
|
||||||
if (receivedTopic == null || payloadDict == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
String imei = payloadDict.getStr("imei");
|
|
||||||
String funcType = payloadDict.getStr("funcType");
|
|
||||||
RedissonClient client = RedisUtils.getClient();
|
|
||||||
String lockKey = "mqtt:consumer:lock:";
|
|
||||||
String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei;
|
|
||||||
log.info("MQTT2获取锁开始{}", KEY);
|
|
||||||
RLock lock = client.getLock(KEY);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 尝试获取锁,
|
|
||||||
boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS);
|
|
||||||
if (isLocked) {
|
|
||||||
// 执行业务逻辑
|
|
||||||
if(StringUtils.isNotBlank(imei)){
|
|
||||||
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
|
||||||
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
|
||||||
RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
|
|
||||||
//在线状态
|
|
||||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
|
||||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
|
||||||
}
|
|
||||||
|
|
||||||
String[] topicArr = receivedTopic.split("/");
|
|
||||||
|
|
||||||
NewMqttRuleContext context = new NewMqttRuleContext();
|
|
||||||
context.setCommandType(topicArr[2]+"_"+funcType);
|
|
||||||
context.setDeviceImei(imei);
|
|
||||||
context.setPayloadDict(payloadDict);
|
|
||||||
|
|
||||||
boolean ruleExecuted = newRuleEngine.executeRule(context);
|
|
||||||
|
|
||||||
if (!ruleExecuted) {
|
|
||||||
log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
log.warn("MQTT2获取锁失败,请稍后再试");
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
} finally {
|
|
||||||
// 释放锁
|
|
||||||
if (lock.isHeldByCurrentThread()) {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class);
|
|
||||||
// if (null == lockInfo) {
|
|
||||||
// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei);
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
//// 获取锁成功,处理业务
|
|
||||||
// try {
|
|
||||||
// if(StringUtils.isNotBlank(imei)){
|
|
||||||
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
|
||||||
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
|
||||||
// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
|
|
||||||
// //在线状态
|
|
||||||
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
|
||||||
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// String[] topicArr = receivedTopic.split("/");
|
|
||||||
//
|
|
||||||
// NewMqttRuleContext context = new NewMqttRuleContext();
|
|
||||||
// context.setCommandType(topicArr[2]+"_"+funcType);
|
|
||||||
// context.setDeviceImei(imei);
|
|
||||||
// context.setPayloadDict(payloadDict);
|
|
||||||
//
|
|
||||||
// boolean ruleExecuted = newRuleEngine.executeRule(context);
|
|
||||||
//
|
|
||||||
// if (!ruleExecuted) {
|
|
||||||
// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
|
|
||||||
// }
|
|
||||||
// } finally {
|
|
||||||
// //释放锁
|
|
||||||
// lockTemplate.releaseLock(lockInfo);
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
|
|
||||||
// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
|
|
||||||
// if (intType > 0) {
|
|
||||||
// MqttRuleContext newCtx = new MqttRuleContext();
|
|
||||||
// String commandType = "Light_"+intType;
|
|
||||||
// newCtx.setCommandType(commandType);
|
|
||||||
// newCtx.setDeviceImei(imei);
|
|
||||||
// newCtx.setPayloadDict(payloadDict);
|
|
||||||
//
|
|
||||||
// boolean ok = ruleEngine.executeRule(newCtx);
|
|
||||||
// if (!ok) {
|
|
||||||
// log.warn("新规则引擎未命中, imei={}", imei);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -9,12 +9,9 @@ import com.fuyuanshen.common.redis.utils.RedisUtils;
|
|||||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||||
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
||||||
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
||||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
|
||||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.redisson.api.RLock;
|
|
||||||
import org.redisson.api.RedissonClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessageHandler;
|
||||||
@ -24,7 +21,6 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||||
|
|
||||||
@ -52,17 +48,6 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
}
|
}
|
||||||
String[] subStr = receivedTopic.split("/");
|
String[] subStr = receivedTopic.split("/");
|
||||||
String deviceImei = subStr[1];
|
String deviceImei = subStr[1];
|
||||||
|
|
||||||
|
|
||||||
RedissonClient client = RedisUtils.getClient();
|
|
||||||
String lockKey = "mqtt:consumer:lock:";
|
|
||||||
String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + deviceImei;
|
|
||||||
RLock lock = client.getLock(KEY);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 尝试获取锁,
|
|
||||||
boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS);
|
|
||||||
if (isLocked) {
|
|
||||||
String state = payloadDict.getStr("state");
|
String state = payloadDict.getStr("state");
|
||||||
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
||||||
if(StringUtils.isNotBlank(deviceImei)){
|
if(StringUtils.isNotBlank(deviceImei)){
|
||||||
@ -104,17 +89,6 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else{
|
|
||||||
log.warn("MQTT获取锁失败,请稍后再试");
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
} finally {
|
|
||||||
// 释放锁
|
|
||||||
if (lock.isHeldByCurrentThread()) {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,18 +38,18 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
@Override
|
@Override
|
||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
try {
|
try {
|
||||||
// Object[] convertArr = context.getConvertArr();
|
Object[] convertArr = context.getConvertArr();
|
||||||
// // Latitude, longitude
|
// Latitude, longitude
|
||||||
// //主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间
|
//主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间
|
||||||
// String mainLightMode = convertArr[1].toString();
|
String mainLightMode = convertArr[1].toString();
|
||||||
// String laserLightMode = convertArr[2].toString();
|
String laserLightMode = convertArr[2].toString();
|
||||||
// String batteryPercentage = convertArr[3].toString();
|
String batteryPercentage = convertArr[3].toString();
|
||||||
// String chargeState = convertArr[4].toString();
|
String chargeState = convertArr[4].toString();
|
||||||
// String batteryRemainingTime = convertArr[5].toString();
|
String batteryRemainingTime = convertArr[5].toString();
|
||||||
//
|
|
||||||
// // 发送设备状态和位置信息到Redis
|
// 发送设备状态和位置信息到Redis
|
||||||
// asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode,
|
asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode,
|
||||||
// batteryPercentage, chargeState, batteryRemainingTime);
|
batteryPercentage, chargeState, batteryRemainingTime);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理上报数据命令时出错", e);
|
log.error("处理上报数据命令时出错", e);
|
||||||
}
|
}
|
||||||
@ -67,6 +67,7 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
*/
|
*/
|
||||||
public void asyncSendDeviceDataToRedisWithFuture(String deviceImei, String mainLightMode, String laserLightMode,
|
public void asyncSendDeviceDataToRedisWithFuture(String deviceImei, String mainLightMode, String laserLightMode,
|
||||||
String batteryPercentage, String chargeState, String batteryRemainingTime) {
|
String batteryPercentage, String chargeState, String batteryRemainingTime) {
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
try {
|
try {
|
||||||
// 构造设备状态信息对象
|
// 构造设备状态信息对象
|
||||||
Map<String, Object> deviceInfo = new LinkedHashMap<>();
|
Map<String, Object> deviceInfo = new LinkedHashMap<>();
|
||||||
@ -90,6 +91,7 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -44,37 +44,37 @@ public class BjqBootLogoRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
// Byte val2 = (Byte) context.getConvertArr()[1];
|
Byte val2 = (Byte) context.getConvertArr()[1];
|
||||||
// if (val2 == 100) {
|
if (val2 == 100) {
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX);
|
String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX);
|
||||||
// if (StringUtils.isEmpty(data)) {
|
if (StringUtils.isEmpty(data)) {
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||||
// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||||
// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||||
//
|
|
||||||
// ArrayList<Integer> intData = new ArrayList<>();
|
ArrayList<Integer> intData = new ArrayList<>();
|
||||||
// intData.add(3);
|
intData.add(3);
|
||||||
// intData.add((int) val2);
|
intData.add((int) val2);
|
||||||
// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
//
|
|
||||||
// Map<String, Object> map = new HashMap<>();
|
Map<String, Object> map = new HashMap<>();
|
||||||
// map.put("instruct", intData);
|
map.put("instruct", intData);
|
||||||
//
|
|
||||||
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||||
// log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}",
|
log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}",
|
||||||
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||||
// JsonUtils.toJsonString(map));
|
JsonUtils.toJsonString(map));
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理开机LOGO时出错", e);
|
log.error("处理开机LOGO时出错", e);
|
||||||
|
|||||||
@ -34,15 +34,15 @@ public class BjqLaserModeSettingsRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
// Object[] convertArr = context.getConvertArr();
|
Object[] convertArr = context.getConvertArr();
|
||||||
//
|
|
||||||
// String mode = convertArr[1].toString();
|
String mode = convertArr[1].toString();
|
||||||
// if(StringUtils.isNotBlank(mode)){
|
if(StringUtils.isNotBlank(mode)){
|
||||||
// // 发送设备状态和位置信息到Redis
|
// 发送设备状态和位置信息到Redis
|
||||||
// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode);
|
syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode);
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理激光模式命令时出错", e);
|
log.error("处理激光模式命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
||||||
|
|||||||
@ -34,15 +34,15 @@ public class BjqLightBrightnessRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
// Object[] convertArr = context.getConvertArr();
|
Object[] convertArr = context.getConvertArr();
|
||||||
//
|
|
||||||
// String convertValue = convertArr[1].toString();
|
String convertValue = convertArr[1].toString();
|
||||||
// // 将设备状态信息存储到Redis中
|
// 将设备状态信息存储到Redis中
|
||||||
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||||
//
|
|
||||||
// // 存储到Redis
|
// 存储到Redis
|
||||||
// RedisUtils.setCacheObject(deviceRedisKey, convertValue);
|
RedisUtils.setCacheObject(deviceRedisKey, convertValue);
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理灯光亮度命令时出错", e);
|
log.error("处理灯光亮度命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||||
|
|||||||
@ -52,26 +52,26 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
// Object[] convertArr = context.getConvertArr();
|
Object[] convertArr = context.getConvertArr();
|
||||||
// // Latitude, longitude
|
// Latitude, longitude
|
||||||
// String latitude = convertArr[1].toString();
|
String latitude = convertArr[1].toString();
|
||||||
// String longitude = convertArr[2].toString();
|
String longitude = convertArr[2].toString();
|
||||||
// // 判断 latitude 和 longitude 是否都为 0
|
// 判断 latitude 和 longitude 是否都为 0
|
||||||
// if ("0".equals(latitude) && "0".equals(longitude)) {
|
if ("0".equals(latitude) && "0".equals(longitude)) {
|
||||||
// log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude);
|
log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude);
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
// // 异步发送经纬度到Redis
|
// 异步发送经纬度到Redis
|
||||||
// asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||||
// // 异步保存数据
|
// 异步保存数据
|
||||||
// asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude);
|
asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||||
//
|
|
||||||
// Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
||||||
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||||
// log.info("发送定位数据到设备=>topic:{},payload:{}",
|
log.info("发送定位数据到设备=>topic:{},payload:{}",
|
||||||
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||||
// JsonUtils.toJsonString(map));
|
JsonUtils.toJsonString(map));
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理定位数据命令时出错", e);
|
log.error("处理定位数据命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||||
@ -121,6 +121,7 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
* @param longitude 经度
|
* @param longitude 经度
|
||||||
*/
|
*/
|
||||||
public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) {
|
public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) {
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
try {
|
try {
|
||||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||||
return;
|
return;
|
||||||
@ -176,6 +177,7 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -187,6 +189,7 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
* @param longitude 经度
|
* @param longitude 经度
|
||||||
*/
|
*/
|
||||||
public void asyncSaveLocationToMySQLWithFuture(String deviceImei, String latitude, String longitude) {
|
public void asyncSaveLocationToMySQLWithFuture(String deviceImei, String latitude, String longitude) {
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
try {
|
try {
|
||||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||||
return;
|
return;
|
||||||
@ -199,6 +202,7 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步保存位置信息到MySQL时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步保存位置信息到MySQL时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -50,47 +50,45 @@ public class BjqModeRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
log.info("处理灯光模式命令");
|
Object[] convertArr = context.getConvertArr();
|
||||||
log.info("MQTT消息负载字典:{}", context.getPayloadDict());
|
|
||||||
// Object[] convertArr = context.getConvertArr();
|
String mainLightMode = convertArr[1].toString();
|
||||||
//
|
String batteryRemainingTime = convertArr[2].toString();
|
||||||
// String mainLightMode = convertArr[1].toString();
|
if (StringUtils.isNotBlank(mainLightMode)) {
|
||||||
// String batteryRemainingTime = convertArr[2].toString();
|
log.info("设备离线mainLightMode:{}", mainLightMode);
|
||||||
// if (StringUtils.isNotBlank(mainLightMode)) {
|
if ("0".equals(mainLightMode)) {
|
||||||
// log.info("设备离线mainLightMode:{}", mainLightMode);
|
|
||||||
// if ("0".equals(mainLightMode)) {
|
// 设备离线
|
||||||
//
|
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
||||||
// // 设备离线
|
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0");
|
||||||
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
|
||||||
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0");
|
String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||||
//
|
String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng);
|
||||||
// String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
if ("1".equals(messageSendingValue)) {
|
||||||
// String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng);
|
// 设置为故障状态
|
||||||
// if ("1".equals(messageSendingValue)) {
|
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2");
|
||||||
// // 设置为故障状态
|
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2");
|
updateWrapper.eq("device_imei", context.getDeviceImei());
|
||||||
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
updateWrapper.set("online_status", 2);
|
||||||
// updateWrapper.eq("device_imei", context.getDeviceImei());
|
deviceService.update(updateWrapper);
|
||||||
// updateWrapper.set("online_status", 2);
|
RedisUtils.deleteObject(sendMessageIng);
|
||||||
// deviceService.update(updateWrapper);
|
|
||||||
// RedisUtils.deleteObject(sendMessageIng);
|
// 解除告警
|
||||||
//
|
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX;
|
||||||
// // 解除告警
|
if (RedisUtils.getCacheObject(deviceRedisKey) != null) {
|
||||||
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX;
|
RedisUtils.deleteObject(deviceRedisKey);
|
||||||
// if (RedisUtils.getCacheObject(deviceRedisKey) != null) {
|
}
|
||||||
// RedisUtils.deleteObject(deviceRedisKey);
|
cancelAlarm(context.getDeviceImei());
|
||||||
// }
|
}
|
||||||
// cancelAlarm(context.getDeviceImei());
|
}
|
||||||
// }
|
// 发送设备状态和位置信息到Redis
|
||||||
// }
|
syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode);
|
||||||
// // 发送设备状态和位置信息到Redis
|
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||||
// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode);
|
|
||||||
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
// 存储到Redis
|
||||||
//
|
RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime);
|
||||||
// // 存储到Redis
|
}
|
||||||
// RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime);
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
// }
|
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理灯光模式命令时出错", e);
|
log.error("处理灯光模式命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||||
|
|||||||
@ -34,8 +34,8 @@ public class BjqPersonnelInfoDataRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
// Object[] convertArr = context.getConvertArr();
|
Object[] convertArr = context.getConvertArr();
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理定位数据命令时出错", e);
|
log.error("处理定位数据命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
||||||
|
|||||||
@ -45,47 +45,47 @@ public class BjqSendMessageRule implements MqttMessageRule {
|
|||||||
try {
|
try {
|
||||||
|
|
||||||
// Byte val2 = (Byte) context.getConvertArr()[1];
|
// Byte val2 = (Byte) context.getConvertArr()[1];
|
||||||
// String val2Str = context.getConvertArr()[1].toString();
|
String val2Str = context.getConvertArr()[1].toString();
|
||||||
// int val2 = Integer.parseInt(val2Str);
|
int val2 = Integer.parseInt(val2Str);
|
||||||
// System.out.println("收到设备信息命令:"+val2);
|
System.out.println("收到设备信息命令:"+val2);
|
||||||
// if (val2 == 100) {
|
if (val2 == 100) {
|
||||||
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// if(val2==200){
|
if(val2==200){
|
||||||
// String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||||
// RedisUtils.deleteObject(sendMessageIng);
|
RedisUtils.deleteObject(sendMessageIng);
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
//
|
|
||||||
// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data");
|
String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data");
|
||||||
// if (StringUtils.isEmpty(data)) {
|
if (StringUtils.isEmpty(data)) {
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||||
// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||||
// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||||
//// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
||||||
//
|
|
||||||
// ArrayList<Integer> intData = new ArrayList<>();
|
ArrayList<Integer> intData = new ArrayList<>();
|
||||||
// intData.add(6);
|
intData.add(6);
|
||||||
// intData.add(val2);
|
intData.add(val2);
|
||||||
// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
// intData.add(0);
|
intData.add(0);
|
||||||
//
|
|
||||||
// Map<String, Object> map = new HashMap<>();
|
Map<String, Object> map = new HashMap<>();
|
||||||
// map.put("instruct", intData);
|
map.put("instruct", intData);
|
||||||
//
|
|
||||||
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||||
// log.info("发送设备信息数据到设备消息=>topic:{},payload:{}",
|
log.info("发送设备信息数据到设备消息=>topic:{},payload:{}",
|
||||||
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||||
// JsonUtils.toJsonString(map));
|
JsonUtils.toJsonString(map));
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理发送设备信息时出错", e);
|
log.error("处理发送设备信息时出错", e);
|
||||||
|
|||||||
@ -7,18 +7,14 @@ import com.fuyuanshen.common.core.utils.StringUtils;
|
|||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
import com.fuyuanshen.equipment.domain.Device;
|
import com.fuyuanshen.equipment.domain.Device;
|
||||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||||
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
|
||||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.redisson.api.RLock;
|
|
||||||
import org.redisson.api.RedissonClient;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -99,18 +95,40 @@ public class MqttMessageConsumer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 处理具体业务逻辑的方法
|
// 处理具体业务逻辑的方法
|
||||||
private void processMessage(String imei) {
|
private void processMessage(String message) {
|
||||||
String threadName = Thread.currentThread().getName();
|
String threadName = Thread.currentThread().getName();
|
||||||
try {
|
try {
|
||||||
// 执行业务逻辑
|
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
||||||
|
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||||
|
// String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
|
||||||
|
// if(StringUtils.isBlank(deviceOnlineStatusRedis)){
|
||||||
|
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
|
// updateWrapper.eq("device_imei", message)
|
||||||
|
// .set("online_status", 1);
|
||||||
|
// deviceMapper.update(updateWrapper);
|
||||||
|
// }
|
||||||
|
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
||||||
|
// queryWrapper.eq("device_imei", message);
|
||||||
|
// queryWrapper.eq("online_status", 1);
|
||||||
|
// Long count = deviceMapper.selectCount(queryWrapper);
|
||||||
|
// if(count == 0){
|
||||||
|
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
|
// updateWrapper.eq("device_imei", message)
|
||||||
|
// .eq("online_status", 0)
|
||||||
|
// .set("online_status", 1);
|
||||||
|
// deviceMapper.update(updateWrapper);
|
||||||
|
// }
|
||||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
updateWrapper.eq("device_imei", imei)
|
updateWrapper.eq("device_imei", message)
|
||||||
.in("online_status", 0,2)
|
.in("online_status", 0,2)
|
||||||
.set("online_status", 1);
|
.set("online_status", 1);
|
||||||
int update = deviceMapper.update(updateWrapper);
|
int update = deviceMapper.update(updateWrapper);
|
||||||
|
// 模拟业务处理耗时
|
||||||
|
// Thread.sleep(200);
|
||||||
|
|
||||||
|
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, imei, e);
|
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -739,7 +739,6 @@ public class DeviceXinghanBizService {
|
|||||||
device.setCreateByName(loginUser.getNickname());
|
device.setCreateByName(loginUser.getNickname());
|
||||||
device.setTypeName(deviceTypes.getTypeName());
|
device.setTypeName(deviceTypes.getTypeName());
|
||||||
device.setDeviceType(deviceTypes.getId());
|
device.setDeviceType(deviceTypes.getId());
|
||||||
device.setDevicePic(deviceTypes.getDevicePic());
|
|
||||||
if (device.getDeviceImei() != null) {
|
if (device.getDeviceImei() != null) {
|
||||||
device.setPubTopic("A/" + device.getDeviceImei());
|
device.setPubTopic("A/" + device.getDeviceImei());
|
||||||
device.setSubTopic("B/" + device.getDeviceImei());
|
device.setSubTopic("B/" + device.getDeviceImei());
|
||||||
|
|||||||
@ -8,8 +8,8 @@ spring.boot.admin.client:
|
|||||||
metadata:
|
metadata:
|
||||||
username: ${spring.boot.admin.client.username}
|
username: ${spring.boot.admin.client.username}
|
||||||
userpassword: ${spring.boot.admin.client.password}
|
userpassword: ${spring.boot.admin.client.password}
|
||||||
username: @monitor.username@
|
username: ${monitor.username}
|
||||||
password: @monitor.password@
|
password: ${monitor.password}
|
||||||
|
|
||||||
--- # snail-job 配置
|
--- # snail-job 配置
|
||||||
snail-job:
|
snail-job:
|
||||||
|
|||||||
@ -11,8 +11,8 @@ spring.boot.admin.client:
|
|||||||
metadata:
|
metadata:
|
||||||
username: ${spring.boot.admin.client.username}
|
username: ${spring.boot.admin.client.username}
|
||||||
userpassword: ${spring.boot.admin.client.password}
|
userpassword: ${spring.boot.admin.client.password}
|
||||||
username: @monitor.username@
|
username: ${monitor.username}
|
||||||
password: @monitor.password@
|
password: ${monitor.password}
|
||||||
|
|
||||||
--- # snail-job 配置
|
--- # snail-job 配置
|
||||||
snail-job:
|
snail-job:
|
||||||
|
|||||||
@ -15,7 +15,6 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -51,7 +50,7 @@ public class DeviceTypeController {
|
|||||||
// @Log("新增设备类型")
|
// @Log("新增设备类型")
|
||||||
@Operation(summary = "新增设备类型")
|
@Operation(summary = "新增设备类型")
|
||||||
@PostMapping(value = "/add")
|
@PostMapping(value = "/add")
|
||||||
public R<Void> createDeviceType(@Validated @ModelAttribute DeviceTypeForm resources) throws IOException {
|
public R<Void> createDeviceType(@Validated @RequestBody DeviceType resources) {
|
||||||
deviceTypeService.create(resources);
|
deviceTypeService.create(resources);
|
||||||
return R.ok();
|
return R.ok();
|
||||||
}
|
}
|
||||||
@ -60,7 +59,7 @@ public class DeviceTypeController {
|
|||||||
// @Log("修改设备类型")
|
// @Log("修改设备类型")
|
||||||
@Operation(summary = "修改设备类型")
|
@Operation(summary = "修改设备类型")
|
||||||
@PutMapping(value = "/update")
|
@PutMapping(value = "/update")
|
||||||
public R<Void> updateDeviceType(@Validated @ModelAttribute DeviceTypeForm resources) throws IOException {
|
public R<Void> updateDeviceType(@Validated @RequestBody DeviceTypeForm resources) {
|
||||||
deviceTypeService.update(resources);
|
deviceTypeService.update(resources);
|
||||||
return R.ok();
|
return R.ok();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -85,8 +85,5 @@ public class DeviceType extends TenantEntity {
|
|||||||
@Schema(title = "型号字典用于PC页面跳转")
|
@Schema(title = "型号字典用于PC页面跳转")
|
||||||
private String pcModelDictionary;
|
private String pcModelDictionary;
|
||||||
|
|
||||||
@Schema(title = "设备图片")
|
|
||||||
private String devicePic;
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,6 @@ package com.fuyuanshen.equipment.domain.form;
|
|||||||
|
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.springframework.web.multipart.MultipartFile;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @Description: 设备类型
|
* @Description: 设备类型
|
||||||
@ -49,10 +48,5 @@ public class DeviceTypeForm {
|
|||||||
*/
|
*/
|
||||||
@Schema(title = "型号字典用于PC页面跳转")
|
@Schema(title = "型号字典用于PC页面跳转")
|
||||||
private String pcModelDictionary;
|
private String pcModelDictionary;
|
||||||
@Schema(title = "设备图片")
|
|
||||||
private String devicePic;
|
|
||||||
|
|
||||||
@Schema(title = "设备图片")
|
|
||||||
private MultipartFile file;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,6 @@ import com.fuyuanshen.equipment.domain.DeviceType;
|
|||||||
import com.fuyuanshen.equipment.domain.dto.DeviceExcelImportDTO;
|
import com.fuyuanshen.equipment.domain.dto.DeviceExcelImportDTO;
|
||||||
import com.fuyuanshen.equipment.domain.dto.ImportResult;
|
import com.fuyuanshen.equipment.domain.dto.ImportResult;
|
||||||
import com.fuyuanshen.equipment.domain.form.DeviceForm;
|
import com.fuyuanshen.equipment.domain.form.DeviceForm;
|
||||||
import com.fuyuanshen.equipment.domain.form.DeviceTypeForm;
|
|
||||||
import com.fuyuanshen.equipment.handler.ImageWriteHandler;
|
import com.fuyuanshen.equipment.handler.ImageWriteHandler;
|
||||||
import com.fuyuanshen.system.domain.vo.SysOssVo;
|
import com.fuyuanshen.system.domain.vo.SysOssVo;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -438,10 +437,8 @@ public class UploadDeviceDataListener implements ReadListener<DeviceExcelImportD
|
|||||||
newDeviceType.setAppModelDictionary(originalDto.getAppModelDictionary());
|
newDeviceType.setAppModelDictionary(originalDto.getAppModelDictionary());
|
||||||
newDeviceType.setPcModelDictionary(originalDto.getPcModelDictionary());
|
newDeviceType.setPcModelDictionary(originalDto.getPcModelDictionary());
|
||||||
|
|
||||||
DeviceTypeForm deviceTypeForm = new DeviceTypeForm();
|
|
||||||
BeanUtil.copyProperties(newDeviceType, deviceTypeForm, true);
|
|
||||||
// 创建新的设备类型
|
// 创建新的设备类型
|
||||||
params.getDeviceTypeService().create(deviceTypeForm);
|
params.getDeviceTypeService().create(newDeviceType);
|
||||||
|
|
||||||
// 重新查询确保获取到正确的ID
|
// 重新查询确保获取到正确的ID
|
||||||
deviceType = params.getDeviceTypeService().queryByName(device.getTypeName());
|
deviceType = params.getDeviceTypeService().queryByName(device.getTypeName());
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import com.fuyuanshen.equipment.domain.DeviceType;
|
|||||||
import com.fuyuanshen.equipment.domain.form.DeviceTypeForm;
|
import com.fuyuanshen.equipment.domain.form.DeviceTypeForm;
|
||||||
import com.fuyuanshen.equipment.domain.query.DeviceTypeQueryCriteria;
|
import com.fuyuanshen.equipment.domain.query.DeviceTypeQueryCriteria;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -64,14 +63,14 @@ public interface DeviceTypeService extends IService<DeviceType> {
|
|||||||
*
|
*
|
||||||
* @param resources /
|
* @param resources /
|
||||||
*/
|
*/
|
||||||
void create(DeviceTypeForm resources) throws IOException;
|
void create(DeviceType resources);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 修改设备类型
|
* 修改设备类型
|
||||||
*
|
*
|
||||||
* @param resources /
|
* @param resources /
|
||||||
*/
|
*/
|
||||||
void update(DeviceTypeForm resources) throws IOException;
|
void update(DeviceTypeForm resources);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 多选删除
|
* 多选删除
|
||||||
|
|||||||
@ -19,16 +19,12 @@ import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
|||||||
import com.fuyuanshen.equipment.mapper.DeviceTypeGrantsMapper;
|
import com.fuyuanshen.equipment.mapper.DeviceTypeGrantsMapper;
|
||||||
import com.fuyuanshen.equipment.mapper.DeviceTypeMapper;
|
import com.fuyuanshen.equipment.mapper.DeviceTypeMapper;
|
||||||
import com.fuyuanshen.equipment.service.DeviceTypeService;
|
import com.fuyuanshen.equipment.service.DeviceTypeService;
|
||||||
import com.fuyuanshen.equipment.utils.FileHashUtil;
|
|
||||||
import com.fuyuanshen.system.domain.vo.SysOssVo;
|
|
||||||
import com.fuyuanshen.system.domain.vo.SysRoleVo;
|
import com.fuyuanshen.system.domain.vo.SysRoleVo;
|
||||||
import com.fuyuanshen.system.service.ISysOssService;
|
|
||||||
import com.fuyuanshen.system.service.ISysRoleService;
|
import com.fuyuanshen.system.service.ISysRoleService;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -50,8 +46,6 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
|||||||
private final DeviceAssignmentsMapper deviceAssignmentsMapper;
|
private final DeviceAssignmentsMapper deviceAssignmentsMapper;
|
||||||
|
|
||||||
private final ISysRoleService roleService;
|
private final ISysRoleService roleService;
|
||||||
private final ISysOssService ossService;
|
|
||||||
private final FileHashUtil fileHashUtil;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -187,38 +181,24 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public void create(DeviceTypeForm resources) throws IOException {
|
public void create(DeviceType resources) {
|
||||||
|
|
||||||
// 校验设备类型名称
|
// 校验设备类型名称
|
||||||
List<DeviceType> typeName = deviceTypeMapper.selectList(new QueryWrapper<DeviceType>().eq("type_name", resources.getTypeName()));
|
List<DeviceType> typeName = deviceTypeMapper.selectList(new QueryWrapper<DeviceType>().eq("type_name", resources.getTypeName()));
|
||||||
if (CollectionUtil.isNotEmpty(typeName)) {
|
if (CollectionUtil.isNotEmpty(typeName)) {
|
||||||
throw new RuntimeException("设备类型名称已存在,无法新增!!!");
|
throw new RuntimeException("设备类型名称已存在,无法新增!!!");
|
||||||
}
|
}
|
||||||
// 保存图片并获取URL
|
|
||||||
if (resources.getFile() != null) {
|
|
||||||
String fileHash = fileHashUtil.hash(resources.getFile());
|
|
||||||
SysOssVo upload = ossService.updateHash(resources.getFile(), fileHash);
|
|
||||||
// 强制将HTTP替换为HTTPS
|
|
||||||
if (upload.getUrl() != null && upload.getUrl().startsWith("http://")) {
|
|
||||||
upload.setUrl(upload.getUrl().replaceFirst("^http://", "https://"));
|
|
||||||
}
|
|
||||||
// 设置图片路径
|
|
||||||
resources.setDevicePic(upload.getUrl());
|
|
||||||
}
|
|
||||||
|
|
||||||
DeviceType deviceType = new DeviceType();
|
|
||||||
BeanUtil.copyProperties(resources, deviceType, true);
|
|
||||||
|
|
||||||
LoginUser loginUser = LoginHelper.getLoginUser();
|
LoginUser loginUser = LoginHelper.getLoginUser();
|
||||||
deviceType.setCustomerId(loginUser.getUserId());
|
resources.setCustomerId(loginUser.getUserId());
|
||||||
deviceType.setOwnerCustomerId(loginUser.getUserId());
|
resources.setOwnerCustomerId(loginUser.getUserId());
|
||||||
deviceType.setOriginalOwnerId(loginUser.getUserId());
|
resources.setOriginalOwnerId(loginUser.getUserId());
|
||||||
deviceType.setCreateByName(loginUser.getNickname());
|
resources.setCreateByName(loginUser.getNickname());
|
||||||
deviceTypeMapper.insert(deviceType);
|
deviceTypeMapper.insert(resources);
|
||||||
|
|
||||||
// 自动授权给自己
|
// 自动授权给自己
|
||||||
DeviceTypeGrants deviceTypeGrants = new DeviceTypeGrants();
|
DeviceTypeGrants deviceTypeGrants = new DeviceTypeGrants();
|
||||||
deviceTypeGrants.setDeviceTypeId(deviceType.getId());
|
deviceTypeGrants.setDeviceTypeId(resources.getId());
|
||||||
deviceTypeGrants.setCustomerId(loginUser.getUserId());
|
deviceTypeGrants.setCustomerId(loginUser.getUserId());
|
||||||
deviceTypeGrants.setGrantorCustomerId(loginUser.getUserId());
|
deviceTypeGrants.setGrantorCustomerId(loginUser.getUserId());
|
||||||
deviceTypeGrants.setGrantedAt(new Date());
|
deviceTypeGrants.setGrantedAt(new Date());
|
||||||
@ -233,7 +213,7 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
@Transactional(rollbackFor = Exception.class)
|
||||||
public void update(DeviceTypeForm resources) throws IOException {
|
public void update(DeviceTypeForm resources) {
|
||||||
DeviceTypeGrants deviceTypeGrants = deviceTypeGrantsMapper.selectById(resources.getId());
|
DeviceTypeGrants deviceTypeGrants = deviceTypeGrantsMapper.selectById(resources.getId());
|
||||||
if (deviceTypeGrants == null) {
|
if (deviceTypeGrants == null) {
|
||||||
throw new RuntimeException("设备类型不存在");
|
throw new RuntimeException("设备类型不存在");
|
||||||
@ -264,17 +244,6 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeMapper, DeviceT
|
|||||||
throw new RuntimeException("无权修改该设备类型");
|
throw new RuntimeException("无权修改该设备类型");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 保存图片并获取URL
|
|
||||||
if (resources.getFile() != null) {
|
|
||||||
String fileHash = fileHashUtil.hash(resources.getFile());
|
|
||||||
SysOssVo upload = ossService.updateHash(resources.getFile(), fileHash);
|
|
||||||
// 强制将HTTP替换为HTTPS
|
|
||||||
if (upload.getUrl() != null && upload.getUrl().startsWith("http://")) {
|
|
||||||
upload.setUrl(upload.getUrl().replaceFirst("^http://", "https://"));
|
|
||||||
}
|
|
||||||
// 设置图片路径
|
|
||||||
resources.setDevicePic(upload.getUrl());
|
|
||||||
}
|
|
||||||
|
|
||||||
BeanUtil.copyProperties(resources, deviceType);
|
BeanUtil.copyProperties(resources, deviceType);
|
||||||
deviceTypeMapper.updateById(deviceType);
|
deviceTypeMapper.updateById(deviceType);
|
||||||
|
|||||||
16
pom.xml
16
pom.xml
@ -85,10 +85,10 @@
|
|||||||
<monitor.username>fys</monitor.username>
|
<monitor.username>fys</monitor.username>
|
||||||
<monitor.password>123456</monitor.password>
|
<monitor.password>123456</monitor.password>
|
||||||
</properties>
|
</properties>
|
||||||
<activation>
|
<!-- <activation> -->
|
||||||
<!-- 默认环境 -->
|
<!-- <!– 默认环境 –> -->
|
||||||
<activeByDefault>true</activeByDefault>
|
<!-- <activeByDefault>true</activeByDefault> -->
|
||||||
</activation>
|
<!-- </activation> -->
|
||||||
</profile>
|
</profile>
|
||||||
<profile>
|
<profile>
|
||||||
<id>prod</id>
|
<id>prod</id>
|
||||||
@ -98,10 +98,10 @@
|
|||||||
<monitor.username>fys</monitor.username>
|
<monitor.username>fys</monitor.username>
|
||||||
<monitor.password>123456</monitor.password>
|
<monitor.password>123456</monitor.password>
|
||||||
</properties>
|
</properties>
|
||||||
<!-- <activation> -->
|
<activation>
|
||||||
<!-- <!– 默认环境 –> -->
|
<!-- 默认环境 -->
|
||||||
<!-- <activeByDefault>true</activeByDefault> -->
|
<activeByDefault>true</activeByDefault>
|
||||||
<!-- </activation> -->
|
</activation>
|
||||||
</profile>
|
</profile>
|
||||||
<profile>
|
<profile>
|
||||||
<id>jingquan</id>
|
<id>jingquan</id>
|
||||||
|
|||||||
Reference in New Issue
Block a user