Compare commits
9 Commits
ca11ef0e35
...
e80830e76c
| Author | SHA1 | Date | |
|---|---|---|---|
| e80830e76c | |||
| 346792d5c1 | |||
| cb87871982 | |||
| 437ab110b7 | |||
| b280038502 | |||
| 3e119b1dd8 | |||
| cab0884d7f | |||
| 88650c3d9f | |||
| ec28ab1092 |
@ -26,6 +26,7 @@ import org.springframework.web.bind.annotation.*;
|
|||||||
import org.springframework.web.multipart.MultipartFile;
|
import org.springframework.web.multipart.MultipartFile;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HBY670设备控制类
|
* HBY670设备控制类
|
||||||
@ -133,7 +134,7 @@ public class AppDeviceXinghanController extends BaseController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// @Log("新增设备")
|
// @Log("新增设备")
|
||||||
@Operation(summary = "新增设备")
|
@Log(title = "新增设备")
|
||||||
@PostMapping(value = "/add")
|
@PostMapping(value = "/add")
|
||||||
public R<Void> addDevice(@RequestBody DeviceForm deviceForm) {
|
public R<Void> addDevice(@RequestBody DeviceForm deviceForm) {
|
||||||
try {
|
try {
|
||||||
@ -144,4 +145,26 @@ public class AppDeviceXinghanController extends BaseController {
|
|||||||
return R.ok();
|
return R.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping(value = "/GetDeviceByName")
|
||||||
|
@Operation(summary = "通过蓝牙名/设备名称查询设备")
|
||||||
|
public R<Object> GetDeviceByName(@RequestBody DeviceForm deviceForm) {
|
||||||
|
Object device = appDeviceService.GetDeviceByName(deviceForm);
|
||||||
|
return R.ok(device);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@PostMapping(value = "/getEquipCountByType")
|
||||||
|
@Operation(summary = "查询某个类型下的设备总数量")
|
||||||
|
public R<Object> getEquipCountByType(@RequestBody DeviceForm deviceForm) {
|
||||||
|
Object device = appDeviceService.getEquipCountByType(deviceForm);
|
||||||
|
return R.ok(device);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostMapping(value = "/getEquipAllByType")
|
||||||
|
@Operation(summary = "查询某个类型下的设备")
|
||||||
|
public R<List<Map<String,Object>>> getEquipAllByType(@RequestBody DeviceForm deviceForm){
|
||||||
|
List<Map<String,Object>> list=appDeviceService.getEquipAllByType(deviceForm);
|
||||||
|
return R.ok(list);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +1,5 @@
|
|||||||
package com.fuyuanshen.global.mqtt.base;
|
package com.fuyuanshen.global.mqtt.base;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.core.task.TaskExecutor;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -16,9 +12,9 @@ import java.util.List;
|
|||||||
@Component
|
@Component
|
||||||
public class MqttRuleEngine {
|
public class MqttRuleEngine {
|
||||||
|
|
||||||
@Autowired
|
// @Autowired
|
||||||
@Qualifier("threadPoolTaskExecutor")
|
// @Qualifier("threadPoolTaskExecutor")
|
||||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||||
|
|
||||||
private final LinkedHashMap<String, MqttMessageRule> rulesMap = new LinkedHashMap<>();
|
private final LinkedHashMap<String, MqttMessageRule> rulesMap = new LinkedHashMap<>();
|
||||||
|
|
||||||
@ -41,7 +37,8 @@ public class MqttRuleEngine {
|
|||||||
int commandType = context.getCommandType();
|
int commandType = context.getCommandType();
|
||||||
MqttMessageRule mqttMessageRule = rulesMap.get("Light_" + commandType);
|
MqttMessageRule mqttMessageRule = rulesMap.get("Light_" + commandType);
|
||||||
if (mqttMessageRule != null) {
|
if (mqttMessageRule != null) {
|
||||||
threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
||||||
|
mqttMessageRule.execute(context);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,26 @@
|
|||||||
|
package com.fuyuanshen.global.mqtt.base;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT消息处理接口
|
||||||
|
*/
|
||||||
|
public interface NewMqttMessageRule {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取命令类型
|
||||||
|
* @return 命令类型
|
||||||
|
*/
|
||||||
|
String getCommandType();
|
||||||
|
/**
|
||||||
|
* 执行处理
|
||||||
|
* @param context 处理上下文
|
||||||
|
*/
|
||||||
|
void execute(NewMqttRuleContext context);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取优先级,数值越小优先级越高
|
||||||
|
* @return 优先级
|
||||||
|
*/
|
||||||
|
default int getPriority() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
package com.fuyuanshen.global.mqtt.base;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT消息处理上下文
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class NewMqttRuleContext {
|
||||||
|
/**
|
||||||
|
* 命令类型
|
||||||
|
*/
|
||||||
|
private String commandType;
|
||||||
|
/**
|
||||||
|
* 设备IMEI
|
||||||
|
*/
|
||||||
|
private String deviceImei;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT消息负载字典
|
||||||
|
*/
|
||||||
|
private Map<String, Object> payloadDict;
|
||||||
|
}
|
||||||
@ -0,0 +1,44 @@
|
|||||||
|
package com.fuyuanshen.global.mqtt.base;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MQTT消息引擎
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class NewMqttRuleEngine {
|
||||||
|
|
||||||
|
// @Autowired
|
||||||
|
// @Qualifier("threadPoolTaskExecutor")
|
||||||
|
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||||
|
|
||||||
|
private final LinkedHashMap<String, NewMqttMessageRule> rulesMap = new LinkedHashMap<>();
|
||||||
|
public NewMqttRuleEngine(List<NewMqttMessageRule> rules) {
|
||||||
|
// 按优先级排序
|
||||||
|
rules.sort(Comparator.comparing(NewMqttMessageRule::getPriority));
|
||||||
|
rules.forEach(rule -> rulesMap.put(rule.getCommandType(), rule)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行匹配
|
||||||
|
* @param context 处理上下文
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean executeRule(NewMqttRuleContext context) {
|
||||||
|
String commandType = context.getCommandType();
|
||||||
|
// String funcType = context.getFuncType();
|
||||||
|
NewMqttMessageRule mqttMessageRule = rulesMap.get(commandType);
|
||||||
|
if (mqttMessageRule != null) {
|
||||||
|
// threadPoolTaskExecutor.execute(() -> mqttMessageRule.execute(context));
|
||||||
|
mqttMessageRule.execute(context);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -44,12 +44,12 @@ public class MqttInboundConfiguration {
|
|||||||
if (url == null) {
|
if (url == null) {
|
||||||
throw new IllegalStateException("MQTT服务器URL未配置");
|
throw new IllegalStateException("MQTT服务器URL未配置");
|
||||||
}
|
}
|
||||||
|
String subTopic = mqttPropertiesConfig.getSubTopic();
|
||||||
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||||
url,
|
url,
|
||||||
clientId,
|
clientId,
|
||||||
mqttPahoClientFactory,
|
mqttPahoClientFactory,
|
||||||
mqttPropertiesConfig.getSubTopic().split(",")
|
subTopic.split(",")
|
||||||
);
|
);
|
||||||
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
||||||
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
||||||
|
|||||||
@ -47,7 +47,7 @@ public class MqttOutboundConfiguration {
|
|||||||
mqttPahoClientFactory
|
mqttPahoClientFactory
|
||||||
);
|
);
|
||||||
mqttPahoMessageHandler.setDefaultQos(1);
|
mqttPahoMessageHandler.setDefaultQos(1);
|
||||||
mqttPahoMessageHandler.setDefaultTopic("B/#");
|
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic());
|
||||||
mqttPahoMessageHandler.setAsync(true);
|
mqttPahoMessageHandler.setAsync(true);
|
||||||
return mqttPahoMessageHandler;
|
return mqttPahoMessageHandler;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,8 @@ public class MqttPropertiesConfig {
|
|||||||
private String url;
|
private String url;
|
||||||
private String subClientId;
|
private String subClientId;
|
||||||
private String subTopic;
|
private String subTopic;
|
||||||
|
private String subTopic2;
|
||||||
private String pubClientId;
|
private String pubClientId;
|
||||||
private String pubTopic;
|
private String pubTopic;
|
||||||
|
private String pubTopic2;
|
||||||
}
|
}
|
||||||
@ -0,0 +1,68 @@
|
|||||||
|
package com.fuyuanshen.global.mqtt.config;
|
||||||
|
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.UUID;
|
||||||
|
import com.fuyuanshen.global.mqtt.receiver.NewReceiverMessageHandler;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.annotation.ServiceActivator;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.core.MessageProducer;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||||
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||||
|
import org.springframework.messaging.MessageChannel;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@Slf4j
|
||||||
|
public class NewMqttInboundConfiguration {
|
||||||
|
@Autowired
|
||||||
|
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||||
|
@Autowired
|
||||||
|
private MqttPahoClientFactory mqttPahoClientFactory;
|
||||||
|
@Autowired
|
||||||
|
private NewReceiverMessageHandler receiverMessageHandler2;
|
||||||
|
//消息通道
|
||||||
|
@Bean
|
||||||
|
public MessageChannel messageInboundChannel2(){
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 配置入站适配器
|
||||||
|
* 作用: 设置订阅主题,以及指定消息的通道 等相关属性
|
||||||
|
* */
|
||||||
|
@Bean
|
||||||
|
public MessageProducer messageProducer2(){
|
||||||
|
// 生成一个不重复的随机数
|
||||||
|
String clientId = mqttPropertiesConfig.getSubClientId() + "_" + UUID.fastUUID();
|
||||||
|
String subTopic = mqttPropertiesConfig.getSubTopic2();
|
||||||
|
log.info("订阅主题:{}", subTopic);
|
||||||
|
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
|
||||||
|
mqttPropertiesConfig.getUrl(),
|
||||||
|
clientId,
|
||||||
|
mqttPahoClientFactory,
|
||||||
|
subTopic.split(",")
|
||||||
|
);
|
||||||
|
mqttPahoMessageDrivenChannelAdapter.setQos(1);
|
||||||
|
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
|
||||||
|
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel2());
|
||||||
|
return mqttPahoMessageDrivenChannelAdapter;
|
||||||
|
}
|
||||||
|
/** 指定处理消息来自哪个通道 */
|
||||||
|
@Bean
|
||||||
|
@ServiceActivator(inputChannel = "messageInboundChannel2")
|
||||||
|
public MessageHandler messageHandler2(){
|
||||||
|
return receiverMessageHandler2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Bean
|
||||||
|
// @ServiceActivator(inputChannel = "messageInboundChannel") // 确保通道名称正确
|
||||||
|
// public MessageHandler deviceAlarmMessageHandler() {
|
||||||
|
// return new DeviceAlrmMessageHandler();
|
||||||
|
// }
|
||||||
|
}
|
||||||
@ -0,0 +1,46 @@
|
|||||||
|
package com.fuyuanshen.global.mqtt.config;
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.UUID;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.integration.annotation.ServiceActivator;
|
||||||
|
import org.springframework.integration.channel.DirectChannel;
|
||||||
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||||
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||||
|
import org.springframework.messaging.MessageChannel;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@Slf4j
|
||||||
|
public class NewMqttOutboundConfiguration {
|
||||||
|
@Autowired
|
||||||
|
private MqttPropertiesConfig mqttPropertiesConfig;
|
||||||
|
@Autowired
|
||||||
|
private MqttPahoClientFactory mqttPahoClientFactory;
|
||||||
|
|
||||||
|
// 消息通道
|
||||||
|
@Bean
|
||||||
|
public MessageChannel mqttOutboundChannel2(){
|
||||||
|
return new DirectChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** 配置出站消息处理器 */
|
||||||
|
@Bean
|
||||||
|
@ServiceActivator(inputChannel = "mqttOutboundChannel2") // 指定处理器针对哪个通道的消息进行处理
|
||||||
|
public MessageHandler mqttOutboundMessageHandler2(){
|
||||||
|
String clientId = mqttPropertiesConfig.getPubClientId() + "_" + UUID.fastUUID();
|
||||||
|
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
|
||||||
|
mqttPropertiesConfig.getUrl(),
|
||||||
|
clientId,
|
||||||
|
mqttPahoClientFactory
|
||||||
|
);
|
||||||
|
mqttPahoMessageHandler.setDefaultQos(1);
|
||||||
|
mqttPahoMessageHandler.setDefaultTopic(mqttPropertiesConfig.getPubTopic2());
|
||||||
|
mqttPahoMessageHandler.setAsync(true);
|
||||||
|
return mqttPahoMessageHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -1,20 +1,22 @@
|
|||||||
package com.fuyuanshen.global.mqtt.publish;
|
package com.fuyuanshen.global.mqtt.publish;
|
||||||
|
|
||||||
|
import cn.dev33.satoken.annotation.SaIgnore;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/api/")
|
@RequestMapping("/api")
|
||||||
|
@SaIgnore
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DeviceDataController {
|
public class DeviceDataController {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private MqttClientTest mqttClientTest;
|
private MqttClientTest mqttClientTest;
|
||||||
|
@GetMapping("/command")
|
||||||
// @PostMapping("/{deviceId}/command")
|
|
||||||
public ResponseEntity<String> sendCommand() {
|
public ResponseEntity<String> sendCommand() {
|
||||||
|
|
||||||
mqttClientTest.sendMsg();
|
mqttClientTest.sendMsg();
|
||||||
|
|||||||
@ -13,10 +13,10 @@ public class MqttClientTest {
|
|||||||
private MqttGateway mqttGateway;
|
private MqttGateway mqttGateway;
|
||||||
|
|
||||||
public void sendMsg() {
|
public void sendMsg() {
|
||||||
mqttGateway.sendMsgToMqtt("worker/location/1", "hello mqtt spring boot");
|
mqttGateway.sendMsgToMqtt("command/894078/HBY670/864865082081523", "hello mqtt spring boot");
|
||||||
log.info("message is send");
|
log.info("message is send");
|
||||||
|
|
||||||
mqttGateway.sendMsgToMqtt("worker/alert/2", "hello mqtt spring boot2");
|
mqttGateway.sendMsgToMqtt("report/894078/HBY670/864865082081523", "hello mqtt spring boot2");
|
||||||
log.info("message is send2");
|
log.info("message is send2");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -0,0 +1,146 @@
|
|||||||
|
package com.fuyuanshen.global.mqtt.receiver;
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.Dict;
|
||||||
|
import com.baomidou.lock.LockTemplate;
|
||||||
|
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||||
|
import com.fuyuanshen.common.core.utils.StringUtils;
|
||||||
|
import com.fuyuanshen.common.json.utils.JsonUtils;
|
||||||
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
|
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
||||||
|
import com.fuyuanshen.global.mqtt.base.NewMqttRuleEngine;
|
||||||
|
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||||
|
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
import org.springframework.messaging.MessageHeaders;
|
||||||
|
import org.springframework.messaging.MessagingException;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class NewReceiverMessageHandler implements MessageHandler {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NewMqttRuleEngine newRuleEngine;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(Message<?> message) throws MessagingException {
|
||||||
|
|
||||||
|
Object payload = message.getPayload();
|
||||||
|
MessageHeaders headers = message.getHeaders();
|
||||||
|
String receivedTopic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
|
||||||
|
String receivedQos = Objects.requireNonNull(headers.get("mqtt_receivedQos")).toString();
|
||||||
|
String timestamp = Objects.requireNonNull(headers.get("timestamp")).toString();
|
||||||
|
|
||||||
|
log.info("MQTT2 payload= {} \n receivedTopic = {} \n receivedQos = {} \n timestamp = {}",
|
||||||
|
payload, receivedTopic, receivedQos, timestamp);
|
||||||
|
|
||||||
|
Dict payloadDict = JsonUtils.parseMap(payload.toString());
|
||||||
|
if (receivedTopic == null || payloadDict == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String imei = payloadDict.getStr("imei");
|
||||||
|
String funcType = payloadDict.getStr("funcType");
|
||||||
|
RedissonClient client = RedisUtils.getClient();
|
||||||
|
String lockKey = "mqtt:consumer:lock:";
|
||||||
|
String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei;
|
||||||
|
log.info("MQTT2获取锁开始{}", KEY);
|
||||||
|
RLock lock = client.getLock(KEY);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 尝试获取锁,
|
||||||
|
boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS);
|
||||||
|
if (isLocked) {
|
||||||
|
// 执行业务逻辑
|
||||||
|
if(StringUtils.isNotBlank(imei)){
|
||||||
|
String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||||
|
String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||||
|
RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
|
||||||
|
//在线状态
|
||||||
|
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||||
|
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] topicArr = receivedTopic.split("/");
|
||||||
|
|
||||||
|
NewMqttRuleContext context = new NewMqttRuleContext();
|
||||||
|
context.setCommandType(topicArr[2]+"_"+funcType);
|
||||||
|
context.setDeviceImei(imei);
|
||||||
|
context.setPayloadDict(payloadDict);
|
||||||
|
|
||||||
|
boolean ruleExecuted = newRuleEngine.executeRule(context);
|
||||||
|
|
||||||
|
if (!ruleExecuted) {
|
||||||
|
log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
log.warn("MQTT2获取锁失败,请稍后再试");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} finally {
|
||||||
|
// 释放锁
|
||||||
|
if (lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// final LockInfo lockInfo = lockTemplate.lock(GlobalConstants.GLOBAL_REDIS_KEY + lockKey + imei, 100L, 3000L, RedissonLockExecutor.class);
|
||||||
|
// if (null == lockInfo) {
|
||||||
|
// log.info("MQTT3业务处理中,请稍后再试:funcType=>{},imei=>{}",funcType,imei);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//// 获取锁成功,处理业务
|
||||||
|
// try {
|
||||||
|
// if(StringUtils.isNotBlank(imei)){
|
||||||
|
// String queueKey = MqttMessageQueueConstants.MQTT_MESSAGE_QUEUE_KEY;
|
||||||
|
// String dedupKey = MqttMessageQueueConstants.MQTT_MESSAGE_DEDUP_KEY;
|
||||||
|
// RedisUtils.offerDeduplicated(queueKey,dedupKey,imei, Duration.ofSeconds(900));
|
||||||
|
// //在线状态
|
||||||
|
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ imei + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
||||||
|
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "1", Duration.ofSeconds(360));
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// String[] topicArr = receivedTopic.split("/");
|
||||||
|
//
|
||||||
|
// NewMqttRuleContext context = new NewMqttRuleContext();
|
||||||
|
// context.setCommandType(topicArr[2]+"_"+funcType);
|
||||||
|
// context.setDeviceImei(imei);
|
||||||
|
// context.setPayloadDict(payloadDict);
|
||||||
|
//
|
||||||
|
// boolean ruleExecuted = newRuleEngine.executeRule(context);
|
||||||
|
//
|
||||||
|
// if (!ruleExecuted) {
|
||||||
|
// log.warn("未找到匹配的规则来处理命令类型: {}", topicArr[2] + " : " +funcType);
|
||||||
|
// }
|
||||||
|
// } finally {
|
||||||
|
// //释放锁
|
||||||
|
// lockTemplate.releaseLock(lockInfo);
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
/* ===== 追加:根据报文内容识别格式并统一解析 ===== */
|
||||||
|
// int intType = MqttXinghanCommandType.computeVirtualCommandType(payloadDict);
|
||||||
|
// if (intType > 0) {
|
||||||
|
// MqttRuleContext newCtx = new MqttRuleContext();
|
||||||
|
// String commandType = "Light_"+intType;
|
||||||
|
// newCtx.setCommandType(commandType);
|
||||||
|
// newCtx.setDeviceImei(imei);
|
||||||
|
// newCtx.setPayloadDict(payloadDict);
|
||||||
|
//
|
||||||
|
// boolean ok = ruleEngine.executeRule(newCtx);
|
||||||
|
// if (!ok) {
|
||||||
|
// log.warn("新规则引擎未命中, imei={}", imei);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -9,9 +9,12 @@ import com.fuyuanshen.common.redis.utils.RedisUtils;
|
|||||||
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
import com.fuyuanshen.global.mqtt.base.MqttRuleContext;
|
||||||
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
import com.fuyuanshen.global.mqtt.base.MqttRuleEngine;
|
||||||
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
import com.fuyuanshen.global.mqtt.base.MqttXinghanCommandType;
|
||||||
|
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
||||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||||
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
import com.fuyuanshen.global.queue.MqttMessageQueueConstants;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessageHandler;
|
||||||
@ -21,6 +24,7 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||||
|
|
||||||
@ -48,6 +52,17 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
}
|
}
|
||||||
String[] subStr = receivedTopic.split("/");
|
String[] subStr = receivedTopic.split("/");
|
||||||
String deviceImei = subStr[1];
|
String deviceImei = subStr[1];
|
||||||
|
|
||||||
|
|
||||||
|
RedissonClient client = RedisUtils.getClient();
|
||||||
|
String lockKey = "mqtt:consumer:lock:";
|
||||||
|
String KEY = GlobalConstants.GLOBAL_REDIS_KEY + lockKey + deviceImei;
|
||||||
|
RLock lock = client.getLock(KEY);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 尝试获取锁,
|
||||||
|
boolean isLocked = lock.tryLock(60, 3000, TimeUnit.MILLISECONDS);
|
||||||
|
if (isLocked) {
|
||||||
String state = payloadDict.getStr("state");
|
String state = payloadDict.getStr("state");
|
||||||
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
Object[] convertArr = ImageToCArrayConverter.convertByteStringToMixedObjectArray(state);
|
||||||
if(StringUtils.isNotBlank(deviceImei)){
|
if(StringUtils.isNotBlank(deviceImei)){
|
||||||
@ -89,6 +104,17 @@ public class ReceiverMessageHandler implements MessageHandler {
|
|||||||
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
log.warn("新规则引擎未命中, imei={}", deviceImei);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}else{
|
||||||
|
log.warn("MQTT获取锁失败,请稍后再试");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} finally {
|
||||||
|
// 释放锁
|
||||||
|
if (lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,18 +38,18 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
@Override
|
@Override
|
||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
try {
|
try {
|
||||||
Object[] convertArr = context.getConvertArr();
|
// Object[] convertArr = context.getConvertArr();
|
||||||
// Latitude, longitude
|
// // Latitude, longitude
|
||||||
//主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间
|
// //主灯档位,激光灯档位,电量百分比,充电状态,电池剩余续航时间
|
||||||
String mainLightMode = convertArr[1].toString();
|
// String mainLightMode = convertArr[1].toString();
|
||||||
String laserLightMode = convertArr[2].toString();
|
// String laserLightMode = convertArr[2].toString();
|
||||||
String batteryPercentage = convertArr[3].toString();
|
// String batteryPercentage = convertArr[3].toString();
|
||||||
String chargeState = convertArr[4].toString();
|
// String chargeState = convertArr[4].toString();
|
||||||
String batteryRemainingTime = convertArr[5].toString();
|
// String batteryRemainingTime = convertArr[5].toString();
|
||||||
|
//
|
||||||
// 发送设备状态和位置信息到Redis
|
// // 发送设备状态和位置信息到Redis
|
||||||
asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode,
|
// asyncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode, laserLightMode,
|
||||||
batteryPercentage, chargeState, batteryRemainingTime);
|
// batteryPercentage, chargeState, batteryRemainingTime);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理上报数据命令时出错", e);
|
log.error("处理上报数据命令时出错", e);
|
||||||
}
|
}
|
||||||
@ -67,7 +67,6 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
*/
|
*/
|
||||||
public void asyncSendDeviceDataToRedisWithFuture(String deviceImei, String mainLightMode, String laserLightMode,
|
public void asyncSendDeviceDataToRedisWithFuture(String deviceImei, String mainLightMode, String laserLightMode,
|
||||||
String batteryPercentage, String chargeState, String batteryRemainingTime) {
|
String batteryPercentage, String chargeState, String batteryRemainingTime) {
|
||||||
CompletableFuture.runAsync(() -> {
|
|
||||||
try {
|
try {
|
||||||
// 构造设备状态信息对象
|
// 构造设备状态信息对象
|
||||||
Map<String, Object> deviceInfo = new LinkedHashMap<>();
|
Map<String, Object> deviceInfo = new LinkedHashMap<>();
|
||||||
@ -91,7 +90,6 @@ public class BjqActiveReportingDeviceDataRule implements MqttMessageRule {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步发送设备信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -44,37 +44,37 @@ public class BjqBootLogoRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
Byte val2 = (Byte) context.getConvertArr()[1];
|
// Byte val2 = (Byte) context.getConvertArr()[1];
|
||||||
if (val2 == 100) {
|
// if (val2 == 100) {
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX);
|
// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() +DEVICE_BOOT_LOGO_KEY_PREFIX);
|
||||||
if (StringUtils.isEmpty(data)) {
|
// if (StringUtils.isEmpty(data)) {
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||||
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||||
log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||||
|
//
|
||||||
ArrayList<Integer> intData = new ArrayList<>();
|
// ArrayList<Integer> intData = new ArrayList<>();
|
||||||
intData.add(3);
|
// intData.add(3);
|
||||||
intData.add((int) val2);
|
// intData.add((int) val2);
|
||||||
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
|
//
|
||||||
Map<String, Object> map = new HashMap<>();
|
// Map<String, Object> map = new HashMap<>();
|
||||||
map.put("instruct", intData);
|
// map.put("instruct", intData);
|
||||||
|
//
|
||||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||||
log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}",
|
// log.info("发送开机LOGO点阵数据到设备消息=>topic:{},payload:{}",
|
||||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||||
JsonUtils.toJsonString(map));
|
// JsonUtils.toJsonString(map));
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理开机LOGO时出错", e);
|
log.error("处理开机LOGO时出错", e);
|
||||||
|
|||||||
@ -34,15 +34,15 @@ public class BjqLaserModeSettingsRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
Object[] convertArr = context.getConvertArr();
|
// Object[] convertArr = context.getConvertArr();
|
||||||
|
//
|
||||||
String mode = convertArr[1].toString();
|
// String mode = convertArr[1].toString();
|
||||||
if(StringUtils.isNotBlank(mode)){
|
// if(StringUtils.isNotBlank(mode)){
|
||||||
// 发送设备状态和位置信息到Redis
|
// // 发送设备状态和位置信息到Redis
|
||||||
syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode);
|
// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(),mode);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理激光模式命令时出错", e);
|
log.error("处理激光模式命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
||||||
|
|||||||
@ -34,15 +34,15 @@ public class BjqLightBrightnessRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
Object[] convertArr = context.getConvertArr();
|
// Object[] convertArr = context.getConvertArr();
|
||||||
|
//
|
||||||
String convertValue = convertArr[1].toString();
|
// String convertValue = convertArr[1].toString();
|
||||||
// 将设备状态信息存储到Redis中
|
// // 将设备状态信息存储到Redis中
|
||||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||||
|
//
|
||||||
// 存储到Redis
|
// // 存储到Redis
|
||||||
RedisUtils.setCacheObject(deviceRedisKey, convertValue);
|
// RedisUtils.setCacheObject(deviceRedisKey, convertValue);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理灯光亮度命令时出错", e);
|
log.error("处理灯光亮度命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||||
|
|||||||
@ -52,26 +52,26 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
Object[] convertArr = context.getConvertArr();
|
// Object[] convertArr = context.getConvertArr();
|
||||||
// Latitude, longitude
|
// // Latitude, longitude
|
||||||
String latitude = convertArr[1].toString();
|
// String latitude = convertArr[1].toString();
|
||||||
String longitude = convertArr[2].toString();
|
// String longitude = convertArr[2].toString();
|
||||||
// 判断 latitude 和 longitude 是否都为 0
|
// // 判断 latitude 和 longitude 是否都为 0
|
||||||
if ("0".equals(latitude) && "0".equals(longitude)) {
|
// if ("0".equals(latitude) && "0".equals(longitude)) {
|
||||||
log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude);
|
// log.info("位置信息为0,不存储到Redis: device={}, lat={}, lon={}", context.getDeviceImei(), latitude, longitude);
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
// 异步发送经纬度到Redis
|
// // 异步发送经纬度到Redis
|
||||||
asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
// asyncSendLocationToRedisWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||||
// 异步保存数据
|
// // 异步保存数据
|
||||||
asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude);
|
// asyncSaveLocationToMySQLWithFuture(context.getDeviceImei(), latitude, longitude);
|
||||||
|
//
|
||||||
Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
// Map<String, Object> map = buildLocationDataMap(latitude, longitude);
|
||||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||||
log.info("发送定位数据到设备=>topic:{},payload:{}",
|
// log.info("发送定位数据到设备=>topic:{},payload:{}",
|
||||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||||
JsonUtils.toJsonString(map));
|
// JsonUtils.toJsonString(map));
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理定位数据命令时出错", e);
|
log.error("处理定位数据命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||||
@ -121,7 +121,6 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
* @param longitude 经度
|
* @param longitude 经度
|
||||||
*/
|
*/
|
||||||
public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) {
|
public void asyncSendLocationToRedisWithFuture(String deviceImei, String latitude, String longitude) {
|
||||||
CompletableFuture.runAsync(() -> {
|
|
||||||
try {
|
try {
|
||||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||||
return;
|
return;
|
||||||
@ -177,7 +176,6 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步发送位置信息到Redis时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -189,7 +187,6 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
* @param longitude 经度
|
* @param longitude 经度
|
||||||
*/
|
*/
|
||||||
public void asyncSaveLocationToMySQLWithFuture(String deviceImei, String latitude, String longitude) {
|
public void asyncSaveLocationToMySQLWithFuture(String deviceImei, String latitude, String longitude) {
|
||||||
CompletableFuture.runAsync(() -> {
|
|
||||||
try {
|
try {
|
||||||
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
if (StringUtils.isBlank(latitude) || StringUtils.isBlank(longitude)) {
|
||||||
return;
|
return;
|
||||||
@ -202,7 +199,6 @@ public class BjqLocationDataRule implements MqttMessageRule {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("异步保存位置信息到MySQL时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
log.error("异步保存位置信息到MySQL时出错: device={}, error={}", deviceImei, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -50,45 +50,47 @@ public class BjqModeRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
Object[] convertArr = context.getConvertArr();
|
log.info("处理灯光模式命令");
|
||||||
|
log.info("MQTT消息负载字典:{}", context.getPayloadDict());
|
||||||
String mainLightMode = convertArr[1].toString();
|
// Object[] convertArr = context.getConvertArr();
|
||||||
String batteryRemainingTime = convertArr[2].toString();
|
//
|
||||||
if (StringUtils.isNotBlank(mainLightMode)) {
|
// String mainLightMode = convertArr[1].toString();
|
||||||
log.info("设备离线mainLightMode:{}", mainLightMode);
|
// String batteryRemainingTime = convertArr[2].toString();
|
||||||
if ("0".equals(mainLightMode)) {
|
// if (StringUtils.isNotBlank(mainLightMode)) {
|
||||||
|
// log.info("设备离线mainLightMode:{}", mainLightMode);
|
||||||
// 设备离线
|
// if ("0".equals(mainLightMode)) {
|
||||||
String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
//
|
||||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0");
|
// // 设备离线
|
||||||
|
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
||||||
String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "0");
|
||||||
String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng);
|
//
|
||||||
if ("1".equals(messageSendingValue)) {
|
// String sendMessageIng = GLOBAL_REDIS_KEY + DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||||
// 设置为故障状态
|
// String messageSendingValue = RedisUtils.getCacheObject(sendMessageIng);
|
||||||
RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2");
|
// if ("1".equals(messageSendingValue)) {
|
||||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
// // 设置为故障状态
|
||||||
updateWrapper.eq("device_imei", context.getDeviceImei());
|
// RedisUtils.setCacheObject(deviceOnlineStatusRedisKey, "2");
|
||||||
updateWrapper.set("online_status", 2);
|
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
deviceService.update(updateWrapper);
|
// updateWrapper.eq("device_imei", context.getDeviceImei());
|
||||||
RedisUtils.deleteObject(sendMessageIng);
|
// updateWrapper.set("online_status", 2);
|
||||||
|
// deviceService.update(updateWrapper);
|
||||||
// 解除告警
|
// RedisUtils.deleteObject(sendMessageIng);
|
||||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX;
|
//
|
||||||
if (RedisUtils.getCacheObject(deviceRedisKey) != null) {
|
// // 解除告警
|
||||||
RedisUtils.deleteObject(deviceRedisKey);
|
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_ALARM_KEY_PREFIX;
|
||||||
}
|
// if (RedisUtils.getCacheObject(deviceRedisKey) != null) {
|
||||||
cancelAlarm(context.getDeviceImei());
|
// RedisUtils.deleteObject(deviceRedisKey);
|
||||||
}
|
// }
|
||||||
}
|
// cancelAlarm(context.getDeviceImei());
|
||||||
// 发送设备状态和位置信息到Redis
|
// }
|
||||||
syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode);
|
// }
|
||||||
String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
// // 发送设备状态和位置信息到Redis
|
||||||
|
// syncSendDeviceDataToRedisWithFuture(context.getDeviceImei(), mainLightMode);
|
||||||
// 存储到Redis
|
// String deviceRedisKey = GlobalConstants.GLOBAL_REDIS_KEY + DeviceRedisKeyConstants.DEVICE_KEY_PREFIX + context.getDeviceImei() + DEVICE_LIGHT_BRIGHTNESS_KEY_PREFIX;
|
||||||
RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime);
|
//
|
||||||
}
|
// // 存储到Redis
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
// RedisUtils.setCacheObject(deviceRedisKey, batteryRemainingTime);
|
||||||
|
// }
|
||||||
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理灯光模式命令时出错", e);
|
log.error("处理灯光模式命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(20));
|
||||||
|
|||||||
@ -34,8 +34,8 @@ public class BjqPersonnelInfoDataRule implements MqttMessageRule {
|
|||||||
public void execute(MqttRuleContext context) {
|
public void execute(MqttRuleContext context) {
|
||||||
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
String functionAccess = FUNCTION_ACCESS_KEY + context.getDeviceImei();
|
||||||
try {
|
try {
|
||||||
Object[] convertArr = context.getConvertArr();
|
// Object[] convertArr = context.getConvertArr();
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(30));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理定位数据命令时出错", e);
|
log.error("处理定位数据命令时出错", e);
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.FAILED.getCode(), Duration.ofSeconds(30));
|
||||||
|
|||||||
@ -45,47 +45,47 @@ public class BjqSendMessageRule implements MqttMessageRule {
|
|||||||
try {
|
try {
|
||||||
|
|
||||||
// Byte val2 = (Byte) context.getConvertArr()[1];
|
// Byte val2 = (Byte) context.getConvertArr()[1];
|
||||||
String val2Str = context.getConvertArr()[1].toString();
|
// String val2Str = context.getConvertArr()[1].toString();
|
||||||
int val2 = Integer.parseInt(val2Str);
|
// int val2 = Integer.parseInt(val2Str);
|
||||||
System.out.println("收到设备信息命令:"+val2);
|
// System.out.println("收到设备信息命令:"+val2);
|
||||||
if (val2 == 100) {
|
// if (val2 == 100) {
|
||||||
RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
// RedisUtils.setCacheObject(functionAccess, FunctionAccessStatus.OK.getCode(), Duration.ofSeconds(20));
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
if(val2==200){
|
// if(val2==200){
|
||||||
String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
// String sendMessageIng = GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":messageSending";
|
||||||
RedisUtils.deleteObject(sendMessageIng);
|
// RedisUtils.deleteObject(sendMessageIng);
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
|
//
|
||||||
String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data");
|
// String data = RedisUtils.getCacheObject(GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX + context.getDeviceImei() + ":app_send_message_data");
|
||||||
if (StringUtils.isEmpty(data)) {
|
// if (StringUtils.isEmpty(data)) {
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
// byte[] arr = ImageToCArrayConverter.convertStringToByteArray(data);
|
||||||
byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
// byte[] specificChunk = ImageToCArrayConverter.getChunk(arr, (val2 - 1), 512);
|
||||||
log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
// log.info("第{}块数据大小: {} 字节", val2, specificChunk.length);
|
||||||
// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
//// System.out.println("第" + val2 + "块数据: " + Arrays.toString(specificChunk));
|
||||||
|
//
|
||||||
ArrayList<Integer> intData = new ArrayList<>();
|
// ArrayList<Integer> intData = new ArrayList<>();
|
||||||
intData.add(6);
|
// intData.add(6);
|
||||||
intData.add(val2);
|
// intData.add(val2);
|
||||||
ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
// ImageToCArrayConverter.buildArr(convertHexToDecimal(specificChunk), intData);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
intData.add(0);
|
// intData.add(0);
|
||||||
|
//
|
||||||
Map<String, Object> map = new HashMap<>();
|
// Map<String, Object> map = new HashMap<>();
|
||||||
map.put("instruct", intData);
|
// map.put("instruct", intData);
|
||||||
|
//
|
||||||
mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
// mqttGateway.sendMsgToMqtt(MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(), 1, JsonUtils.toJsonString(map));
|
||||||
log.info("发送设备信息数据到设备消息=>topic:{},payload:{}",
|
// log.info("发送设备信息数据到设备消息=>topic:{},payload:{}",
|
||||||
MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
// MqttConstants.GLOBAL_PUB_KEY + context.getDeviceImei(),
|
||||||
JsonUtils.toJsonString(map));
|
// JsonUtils.toJsonString(map));
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理发送设备信息时出错", e);
|
log.error("处理发送设备信息时出错", e);
|
||||||
|
|||||||
@ -7,14 +7,18 @@ import com.fuyuanshen.common.core.utils.StringUtils;
|
|||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
import com.fuyuanshen.equipment.domain.Device;
|
import com.fuyuanshen.equipment.domain.Device;
|
||||||
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||||
|
import com.fuyuanshen.global.mqtt.base.NewMqttRuleContext;
|
||||||
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
import com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -95,40 +99,18 @@ public class MqttMessageConsumer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 处理具体业务逻辑的方法
|
// 处理具体业务逻辑的方法
|
||||||
private void processMessage(String message) {
|
private void processMessage(String imei) {
|
||||||
String threadName = Thread.currentThread().getName();
|
String threadName = Thread.currentThread().getName();
|
||||||
try {
|
try {
|
||||||
log.info("业务处理线程 {} 开始处理消息: {}", threadName, message);
|
// 执行业务逻辑
|
||||||
// String deviceOnlineStatusRedisKey = GlobalConstants.GLOBAL_REDIS_KEY+ DEVICE_KEY_PREFIX+ message + DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX ;
|
|
||||||
// String deviceOnlineStatusRedis = RedisUtils.getCacheObject(deviceOnlineStatusRedisKey);
|
|
||||||
// if(StringUtils.isBlank(deviceOnlineStatusRedis)){
|
|
||||||
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
|
||||||
// updateWrapper.eq("device_imei", message)
|
|
||||||
// .set("online_status", 1);
|
|
||||||
// deviceMapper.update(updateWrapper);
|
|
||||||
// }
|
|
||||||
// QueryWrapper<Device> queryWrapper = new QueryWrapper<>();
|
|
||||||
// queryWrapper.eq("device_imei", message);
|
|
||||||
// queryWrapper.eq("online_status", 1);
|
|
||||||
// Long count = deviceMapper.selectCount(queryWrapper);
|
|
||||||
// if(count == 0){
|
|
||||||
// UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
|
||||||
// updateWrapper.eq("device_imei", message)
|
|
||||||
// .eq("online_status", 0)
|
|
||||||
// .set("online_status", 1);
|
|
||||||
// deviceMapper.update(updateWrapper);
|
|
||||||
// }
|
|
||||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
updateWrapper.eq("device_imei", message)
|
updateWrapper.eq("device_imei", imei)
|
||||||
.in("online_status", 0,2)
|
.in("online_status", 0,2)
|
||||||
.set("online_status", 1);
|
.set("online_status", 1);
|
||||||
int update = deviceMapper.update(updateWrapper);
|
int update = deviceMapper.update(updateWrapper);
|
||||||
// 模拟业务处理耗时
|
|
||||||
// Thread.sleep(200);
|
|
||||||
|
|
||||||
log.info("业务处理线程 {} 完成消息处理: {}", threadName, message);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, message, e);
|
log.error("业务处理线程 {} 处理消息时发生错误: {}", threadName, imei, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -57,9 +57,11 @@ import com.fuyuanshen.web.domain.Dto.DeviceDebugLogoUploadDto;
|
|||||||
import com.fuyuanshen.web.domain.Dto.DeviceXinghanInstructDto;
|
import com.fuyuanshen.web.domain.Dto.DeviceXinghanInstructDto;
|
||||||
import com.fuyuanshen.web.domain.vo.DeviceXinghanDetailVo;
|
import com.fuyuanshen.web.domain.vo.DeviceXinghanDetailVo;
|
||||||
import com.fuyuanshen.web.enums.AlarmTypeEnum;
|
import com.fuyuanshen.web.enums.AlarmTypeEnum;
|
||||||
|
import com.fuyuanshen.web.util.AliyunVoiceUtil;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
@ -93,6 +95,7 @@ public class DeviceXinghanBizService {
|
|||||||
private final DeviceAssignmentsService deviceAssignmentsService;
|
private final DeviceAssignmentsService deviceAssignmentsService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ObjectMapper objectMapper;
|
private ObjectMapper objectMapper;
|
||||||
|
private final AliyunVoiceUtil voiceUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 所有档位的描述表
|
* 所有档位的描述表
|
||||||
@ -135,7 +138,6 @@ public class DeviceXinghanBizService {
|
|||||||
public void upSOSGradeSettings(DeviceXinghanInstructDto dto) {
|
public void upSOSGradeSettings(DeviceXinghanInstructDto dto) {
|
||||||
if(dto.getIsBluetooth()){
|
if(dto.getIsBluetooth()){
|
||||||
long deviceId = dto.getDeviceId();
|
long deviceId = dto.getDeviceId();
|
||||||
|
|
||||||
// 1. 使用Optional简化空值检查,使代码更简洁
|
// 1. 使用Optional简化空值检查,使代码更简洁
|
||||||
Device device = Optional.ofNullable(deviceMapper.selectById(deviceId))
|
Device device = Optional.ofNullable(deviceMapper.selectById(deviceId))
|
||||||
.orElseThrow(() -> new ServiceException("设备不存在"));
|
.orElseThrow(() -> new ServiceException("设备不存在"));
|
||||||
@ -147,6 +149,24 @@ public class DeviceXinghanBizService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发异步报警
|
||||||
|
* Spring 会自动调用 AsyncConfig.getAsyncExecutor() 来执行此方法
|
||||||
|
*/
|
||||||
|
@Async
|
||||||
|
public void executeSosCall(String phone) {
|
||||||
|
log.info("[SOS业务] 准备发起语音拨号 -> 目标: {}", phone);
|
||||||
|
Map<String, String> params = Map.of("device", "670");
|
||||||
|
String callId = voiceUtil.sendTtsSync(phone, "TTS_328730104", params);
|
||||||
|
|
||||||
|
if (callId != null) {
|
||||||
|
log.info("[SOS业务] 拨号指令下发成功, callId: {}", callId);
|
||||||
|
// 这里可以记录拨打日志到数据库
|
||||||
|
} else {
|
||||||
|
log.error("[SOS业务] 拨号指令下发失败,请检查配置或余额");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设置强制报警
|
* 设置强制报警
|
||||||
*/
|
*/
|
||||||
@ -768,4 +788,23 @@ public class DeviceXinghanBizService {
|
|||||||
return uuidStr.replaceAll("-", "");
|
return uuidStr.replaceAll("-", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> GetDeviceByName(DeviceForm deviceForm){
|
||||||
|
List<Map<String, Object>> list= deviceMapper.GetDeviceByName(deviceForm);
|
||||||
|
Map<String, Object> device=null;
|
||||||
|
if(list!=null && list.size()>0){
|
||||||
|
device=list.get(0);
|
||||||
|
}
|
||||||
|
return device;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getEquipCountByType(DeviceForm form){
|
||||||
|
var res=deviceMapper.getEquipCountByType(form);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Map<String,Object>> getEquipAllByType(DeviceForm deviceForm){
|
||||||
|
List<Map<String, Object>> list= deviceMapper.getEquipAllByType(deviceForm);
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,87 @@
|
|||||||
|
package com.fuyuanshen.web.util;
|
||||||
|
|
||||||
|
import com.aliyun.dyvmsapi20170525.Client;
|
||||||
|
import com.aliyun.dyvmsapi20170525.models.SingleCallByTtsRequest;
|
||||||
|
import com.aliyun.dyvmsapi20170525.models.SingleCallByTtsResponse;
|
||||||
|
import com.aliyun.teaopenapi.models.Config;
|
||||||
|
import com.aliyun.teautil.models.RuntimeOptions;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class AliyunVoiceUtil {
|
||||||
|
|
||||||
|
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@Value("${alibaba.tts.akId}")
|
||||||
|
private String akId;
|
||||||
|
@Value("${alibaba.tts.akSecret}")
|
||||||
|
private String akSecret;
|
||||||
|
// @Value("${alibaba.tts.calledShowNumber:}")
|
||||||
|
private String calledShowNumber;
|
||||||
|
|
||||||
|
// ========== 核心:单例客户端(类似 OkHttpClient) ==========
|
||||||
|
private volatile Client client;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取客户端(双重检查锁实现单例)
|
||||||
|
* 只有在第一次调用时才会根据配置实例化,后续直接返回复用
|
||||||
|
*/
|
||||||
|
private Client getClient() throws Exception {
|
||||||
|
if (client == null) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (client == null) {
|
||||||
|
log.info("[AliyunVoice] 正在初始化阿里云语音客户端...");
|
||||||
|
Config config = new Config()
|
||||||
|
.setAccessKeyId(akId)
|
||||||
|
.setAccessKeySecret(akSecret)
|
||||||
|
.setEndpoint("dyvmsapi.aliyuncs.com");
|
||||||
|
this.client = new Client(config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 同步发送方法:由异步架构调用
|
||||||
|
*/
|
||||||
|
public String sendTtsSync(String phone, String templateCode, Map<String, String> params) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 1. 获取(或初始化)单例客户端
|
||||||
|
Client voiceClient = getClient();
|
||||||
|
|
||||||
|
SingleCallByTtsRequest request = new SingleCallByTtsRequest()
|
||||||
|
.setCalledNumber(phone)
|
||||||
|
.setTtsCode(templateCode)
|
||||||
|
.setTtsParam(objectMapper.writeValueAsString(params));
|
||||||
|
|
||||||
|
if (StringUtils.hasText(calledShowNumber)) {
|
||||||
|
request.setCalledShowNumber(calledShowNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 生产级超时配置
|
||||||
|
RuntimeOptions runtime = new RuntimeOptions();
|
||||||
|
runtime.setConnectTimeout(5000);
|
||||||
|
runtime.setReadTimeout(10000);
|
||||||
|
|
||||||
|
SingleCallByTtsResponse response = voiceClient.singleCallByTtsWithOptions(request, runtime);
|
||||||
|
|
||||||
|
if ("OK".equalsIgnoreCase(response.getBody().getCode())) {
|
||||||
|
return response.getBody().getCallId();
|
||||||
|
} else {
|
||||||
|
log.error("[AliyunVoice] 拨号失败: {}", response.getBody().getMessage());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[AliyunVoice] 接口异常", e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -140,6 +140,18 @@
|
|||||||
<version>3.3.1</version>
|
<version>3.3.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- 电话语音通知 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aliyun</groupId>
|
||||||
|
<artifactId>dyvmsapi20170525</artifactId>
|
||||||
|
<version>4.2.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aliyun</groupId>
|
||||||
|
<artifactId>tea-openapi</artifactId>
|
||||||
|
<version>0.3.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- fastjson2 -->
|
<!-- fastjson2 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba.fastjson2</groupId>
|
<groupId>com.alibaba.fastjson2</groupId>
|
||||||
|
|||||||
@ -54,6 +54,9 @@ public class DeviceForm {
|
|||||||
@Schema(title = "备注")
|
@Schema(title = "备注")
|
||||||
private String remark;
|
private String remark;
|
||||||
|
|
||||||
|
@Schema(title = "商户号")
|
||||||
|
private Long tenant_id;
|
||||||
|
|
||||||
|
|
||||||
// 设备类型相关字段
|
// 设备类型相关字段
|
||||||
@Schema(title = "设备类型名称")
|
@Schema(title = "设备类型名称")
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
|||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
import com.fuyuanshen.equipment.domain.Device;
|
import com.fuyuanshen.equipment.domain.Device;
|
||||||
import com.fuyuanshen.equipment.domain.dto.InstructionRecordDto;
|
import com.fuyuanshen.equipment.domain.dto.InstructionRecordDto;
|
||||||
|
import com.fuyuanshen.equipment.domain.form.DeviceForm;
|
||||||
import com.fuyuanshen.equipment.domain.query.DeviceQueryCriteria;
|
import com.fuyuanshen.equipment.domain.query.DeviceQueryCriteria;
|
||||||
import com.fuyuanshen.equipment.domain.vo.*;
|
import com.fuyuanshen.equipment.domain.vo.*;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
@ -148,4 +149,10 @@ public interface DeviceMapper extends BaseMapper<Device> {
|
|||||||
*/
|
*/
|
||||||
int countByDeviceTypeId(@Param("deviceTypeId") Long deviceTypeId);
|
int countByDeviceTypeId(@Param("deviceTypeId") Long deviceTypeId);
|
||||||
|
|
||||||
|
List<Map<String, Object>> GetDeviceByName(DeviceForm deviceForm);
|
||||||
|
|
||||||
|
int getEquipCountByType(DeviceForm deviceForm);
|
||||||
|
|
||||||
|
List<Map<String,Object>> getEquipAllByType(DeviceForm deviceForm);
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -325,6 +325,7 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> impleme
|
|||||||
device.setCreateByName(loginUser.getNickname());
|
device.setCreateByName(loginUser.getNickname());
|
||||||
device.setTypeName(deviceType.getTypeName());
|
device.setTypeName(deviceType.getTypeName());
|
||||||
device.setDeviceType(deviceType.getId());
|
device.setDeviceType(deviceType.getId());
|
||||||
|
device.setDevicePic(deviceType.getDevicePic());
|
||||||
if (device.getDeviceImei() != null) {
|
if (device.getDeviceImei() != null) {
|
||||||
device.setPubTopic("A/" + device.getDeviceImei());
|
device.setPubTopic("A/" + device.getDeviceImei());
|
||||||
device.setSubTopic("B/" + device.getDeviceImei());
|
device.setSubTopic("B/" + device.getDeviceImei());
|
||||||
|
|||||||
@ -264,6 +264,44 @@
|
|||||||
</if>
|
</if>
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
<select id="GetDeviceByName" resultType="map" >
|
||||||
|
select a.id,
|
||||||
|
a.device_type,
|
||||||
|
a.device_name,
|
||||||
|
a.device_mac,
|
||||||
|
a.type_name,
|
||||||
|
a.bluetooth_name,
|
||||||
|
a.device_imei
|
||||||
|
from device a
|
||||||
|
<where>
|
||||||
|
a.tenant_id = #{tenant_id} and
|
||||||
|
a.device_type = #{deviceType}
|
||||||
|
AND (
|
||||||
|
a.device_name=#{deviceName} or
|
||||||
|
a.bluetooth_name=#{deviceName}
|
||||||
|
)
|
||||||
|
</where>
|
||||||
|
</select>
|
||||||
|
|
||||||
|
<select id="getEquipCountByType" resultType="int">
|
||||||
|
select count(1) cnt
|
||||||
|
from device a
|
||||||
|
<where>
|
||||||
|
a.tenant_id = #{tenant_id} and
|
||||||
|
a.device_type = #{deviceType}
|
||||||
|
</where>
|
||||||
|
</select>
|
||||||
|
|
||||||
|
<select id="getEquipAllByType" resultType="map">
|
||||||
|
select device_mac,bluetooth_name,device_name
|
||||||
|
from device a
|
||||||
|
<where>
|
||||||
|
a.tenant_id = #{tenant_id} and
|
||||||
|
a.device_type = #{deviceType}
|
||||||
|
</where>
|
||||||
|
|
||||||
|
</select>
|
||||||
|
|
||||||
<!-- 获取分配设备的客户 -->
|
<!-- 获取分配设备的客户 -->
|
||||||
<select id="getAssignCustomer" resultType="com.fuyuanshen.equipment.domain.Device">
|
<select id="getAssignCustomer" resultType="com.fuyuanshen.equipment.domain.Device">
|
||||||
SELECT *
|
SELECT *
|
||||||
|
|||||||
Reference in New Issue
Block a user