离线状态变更
This commit is contained in:
@ -1,23 +1,37 @@
|
|||||||
package com.fuyuanshen.global.mqtt.listener;
|
package com.fuyuanshen.global.mqtt.listener;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
||||||
|
import com.fuyuanshen.common.core.constant.GlobalConstants;
|
||||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||||
|
import com.fuyuanshen.equipment.domain.Device;
|
||||||
|
import com.fuyuanshen.equipment.mapper.DeviceMapper;
|
||||||
import com.fuyuanshen.global.mqtt.listener.domain.FunctionAccessStatus;
|
import com.fuyuanshen.global.mqtt.listener.domain.FunctionAccessStatus;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.data.redis.connection.Message;
|
import org.springframework.data.redis.connection.Message;
|
||||||
import org.springframework.data.redis.connection.MessageListener;
|
import org.springframework.data.redis.connection.MessageListener;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
|
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_KEY;
|
||||||
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_TIMEOUT_KEY;
|
import static com.fuyuanshen.common.core.constant.GlobalConstants.FUNCTION_ACCESS_TIMEOUT_KEY;
|
||||||
|
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_KEY_PREFIX;
|
||||||
|
import static com.fuyuanshen.global.mqtt.constants.DeviceRedisKeyConstants.DEVICE_ONLINE_STATUS_KEY_PREFIX;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RedisKeyExpirationListener implements MessageListener {
|
public class RedisKeyExpirationListener implements MessageListener {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("threadPoolTaskExecutor")
|
||||||
|
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DeviceMapper deviceMapper;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message, byte[] pattern) {
|
public void onMessage(Message message, byte[] pattern) {
|
||||||
String expiredKey = new String(message.getBody());
|
String expiredKey = new String(message.getBody());
|
||||||
@ -26,8 +40,17 @@ public class RedisKeyExpirationListener implements MessageListener {
|
|||||||
String element = expiredKey.substring(FUNCTION_ACCESS_KEY.length());
|
String element = expiredKey.substring(FUNCTION_ACCESS_KEY.length());
|
||||||
handleFunctionAccessExpired(element);
|
handleFunctionAccessExpired(element);
|
||||||
}
|
}
|
||||||
|
if(expiredKey.endsWith(DEVICE_ONLINE_STATUS_KEY_PREFIX)){
|
||||||
|
threadPoolTaskExecutor.execute(() -> {
|
||||||
|
log.info("设备离线:{}", expiredKey);
|
||||||
|
String element = expiredKey.substring(GlobalConstants.GLOBAL_REDIS_KEY.length() + DEVICE_KEY_PREFIX.length(), expiredKey.length() - DEVICE_ONLINE_STATUS_KEY_PREFIX.length());
|
||||||
|
UpdateWrapper<Device> deviceUpdateWrapper = new UpdateWrapper<>();
|
||||||
|
deviceUpdateWrapper.eq("device_imei", element);
|
||||||
|
deviceUpdateWrapper.set("online_status", 0);
|
||||||
|
deviceMapper.update(deviceUpdateWrapper);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 访问key过期事件
|
* 访问key过期事件
|
||||||
* @param element 批次ID
|
* @param element 批次ID
|
||||||
|
@ -100,7 +100,6 @@ public class DeviceBJQBizService {
|
|||||||
|
|
||||||
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
UpdateWrapper<Device> updateWrapper = new UpdateWrapper<>();
|
||||||
updateWrapper.eq("id", deviceId)
|
updateWrapper.eq("id", deviceId)
|
||||||
.eq("binding_user_id", AppLoginHelper.getUserId())
|
|
||||||
.set("send_msg", bo.getSendMsg());
|
.set("send_msg", bo.getSendMsg());
|
||||||
deviceMapper.update(updateWrapper);
|
deviceMapper.update(updateWrapper);
|
||||||
|
|
||||||
|
@ -114,4 +114,10 @@ public class DeviceQueryCriteria extends BaseEntity {
|
|||||||
|
|
||||||
private String content;
|
private String content;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设备在线状态
|
||||||
|
* 0:离线;1:在线
|
||||||
|
*/
|
||||||
|
private Integer onlineStatus;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -281,6 +281,10 @@
|
|||||||
<if test="criteria.groupId != null">
|
<if test="criteria.groupId != null">
|
||||||
and d.group_id = #{criteria.groupId}
|
and d.group_id = #{criteria.groupId}
|
||||||
</if>
|
</if>
|
||||||
|
<if test="criteria.onlineStatus != null">
|
||||||
|
and d.online_status = #{criteria.onlineStatus}
|
||||||
|
</if>
|
||||||
|
ORDER BY d.create_time DESC
|
||||||
</select>
|
</select>
|
||||||
<select id="getLocationHistory" resultType="com.fuyuanshen.equipment.domain.vo.LocationHistoryVo">
|
<select id="getLocationHistory" resultType="com.fuyuanshen.equipment.domain.vo.LocationHistoryVo">
|
||||||
select a.id,a.device_name,a.device_type,b.type_name deviceTypeName,a.device_imei,a.device_mac from device a
|
select a.id,a.device_name,a.device_type,b.type_name deviceTypeName,a.device_imei,a.device_mac from device a
|
||||||
|
Reference in New Issue
Block a user