初始化提交
This commit is contained in:
@ -0,0 +1,36 @@
|
||||
package com.fuyuanshen.common.sse.config;
|
||||
|
||||
import com.fuyuanshen.common.sse.controller.SseController;
|
||||
import com.fuyuanshen.common.sse.core.SseEmitterManager;
|
||||
import com.fuyuanshen.common.sse.listener.SseTopicListener;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
/**
|
||||
* SSE 自动装配
|
||||
*
|
||||
* @author Lion Li
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
|
||||
@EnableConfigurationProperties(SseProperties.class)
|
||||
public class SseAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public SseEmitterManager sseEmitterManager() {
|
||||
return new SseEmitterManager();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SseTopicListener sseTopicListener() {
|
||||
return new SseTopicListener();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SseController sseController(SseEmitterManager sseEmitterManager) {
|
||||
return new SseController(sseEmitterManager);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.fuyuanshen.common.sse.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* SSE 配置项
|
||||
*
|
||||
* @author Lion Li
|
||||
*/
|
||||
@Data
|
||||
@ConfigurationProperties("sse")
|
||||
public class SseProperties {
|
||||
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 路径
|
||||
*/
|
||||
private String path;
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package com.fuyuanshen.common.sse.controller;
|
||||
|
||||
import cn.dev33.satoken.annotation.SaIgnore;
|
||||
import cn.dev33.satoken.stp.StpUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import com.fuyuanshen.common.core.domain.R;
|
||||
import com.fuyuanshen.common.satoken.utils.LoginHelper;
|
||||
import com.fuyuanshen.common.sse.core.SseEmitterManager;
|
||||
import com.fuyuanshen.common.sse.dto.SseMessageDto;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SSE 控制器
|
||||
*
|
||||
* @author Lion Li
|
||||
*/
|
||||
@RestController
|
||||
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
|
||||
@RequiredArgsConstructor
|
||||
public class SseController implements DisposableBean {
|
||||
|
||||
private final SseEmitterManager sseEmitterManager;
|
||||
|
||||
/**
|
||||
* 建立 SSE 连接
|
||||
*/
|
||||
@GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
public SseEmitter connect() {
|
||||
StpUtil.checkLogin();
|
||||
String tokenValue = StpUtil.getTokenValue();
|
||||
Long userId = LoginHelper.getUserId();
|
||||
return sseEmitterManager.connect(userId, tokenValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭 SSE 连接
|
||||
*/
|
||||
@SaIgnore
|
||||
@GetMapping(value = "${sse.path}/close")
|
||||
public R<Void> close() {
|
||||
String tokenValue = StpUtil.getTokenValue();
|
||||
Long userId = LoginHelper.getUserId();
|
||||
sseEmitterManager.disconnect(userId, tokenValue);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
/**
|
||||
* 向特定用户发送消息
|
||||
*
|
||||
* @param userId 目标用户的 ID
|
||||
* @param msg 要发送的消息内容
|
||||
*/
|
||||
@GetMapping(value = "${sse.path}/send")
|
||||
public R<Void> send(Long userId, String msg) {
|
||||
SseMessageDto dto = new SseMessageDto();
|
||||
dto.setUserIds(List.of(userId));
|
||||
dto.setMessage(msg);
|
||||
sseEmitterManager.publishMessage(dto);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
/**
|
||||
* 向所有用户发送消息
|
||||
*
|
||||
* @param msg 要发送的消息内容
|
||||
*/
|
||||
@GetMapping(value = "${sse.path}/sendAll")
|
||||
public R<Void> send(String msg) {
|
||||
sseEmitterManager.publishAll(msg);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理资源。此方法目前不执行任何操作,但避免因未实现而导致错误
|
||||
*/
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
// 销毁时不需要做什么 此方法避免无用操作报错
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
package com.fuyuanshen.common.sse.core;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fuyuanshen.common.redis.utils.RedisUtils;
|
||||
import com.fuyuanshen.common.sse.dto.SseMessageDto;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 管理 Server-Sent Events (SSE) 连接
|
||||
*
|
||||
* @author Lion Li
|
||||
*/
|
||||
@Slf4j
|
||||
public class SseEmitterManager {
|
||||
|
||||
/**
|
||||
* 订阅的频道
|
||||
*/
|
||||
private final static String SSE_TOPIC = "global:sse";
|
||||
|
||||
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 建立与指定用户的 SSE 连接
|
||||
*
|
||||
* @param userId 用户的唯一标识符,用于区分不同用户的连接
|
||||
* @param token 用户的唯一令牌,用于识别具体的连接
|
||||
* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
|
||||
*/
|
||||
public SseEmitter connect(Long userId, String token) {
|
||||
// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
|
||||
// 每个用户可以有多个 SSE 连接,通过 token 进行区分
|
||||
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
|
||||
|
||||
// 创建一个新的 SseEmitter 实例,超时时间设置为 0 表示无限制
|
||||
SseEmitter emitter = new SseEmitter(0L);
|
||||
|
||||
emitters.put(token, emitter);
|
||||
|
||||
// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
|
||||
emitter.onCompletion(() -> {
|
||||
SseEmitter remove = emitters.remove(token);
|
||||
if (remove != null) {
|
||||
remove.complete();
|
||||
}
|
||||
});
|
||||
emitter.onTimeout(() -> {
|
||||
SseEmitter remove = emitters.remove(token);
|
||||
if (remove != null) {
|
||||
remove.complete();
|
||||
}
|
||||
});
|
||||
emitter.onError((e) -> {
|
||||
SseEmitter remove = emitters.remove(token);
|
||||
if (remove != null) {
|
||||
remove.complete();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
// 向客户端发送一条连接成功的事件
|
||||
emitter.send(SseEmitter.event().comment("connected"));
|
||||
} catch (IOException e) {
|
||||
// 如果发送消息失败,则从映射表中移除 emitter
|
||||
emitters.remove(token);
|
||||
}
|
||||
return emitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* 断开指定用户的 SSE 连接
|
||||
*
|
||||
* @param userId 用户的唯一标识符,用于区分不同用户的连接
|
||||
* @param token 用户的唯一令牌,用于识别具体的连接
|
||||
*/
|
||||
public void disconnect(Long userId, String token) {
|
||||
if (userId == null || token == null) {
|
||||
return;
|
||||
}
|
||||
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
||||
if (MapUtil.isNotEmpty(emitters)) {
|
||||
try {
|
||||
SseEmitter sseEmitter = emitters.get(token);
|
||||
sseEmitter.send(SseEmitter.event().comment("disconnected"));
|
||||
sseEmitter.complete();
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
emitters.remove(token);
|
||||
} else {
|
||||
USER_TOKEN_EMITTERS.remove(userId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息
|
||||
*
|
||||
* @param consumer 处理SSE消息的消费者函数
|
||||
*/
|
||||
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
|
||||
RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定的用户会话发送消息
|
||||
*
|
||||
* @param userId 要发送消息的用户id
|
||||
* @param message 要发送的消息内容
|
||||
*/
|
||||
public void sendMessage(Long userId, String message) {
|
||||
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
||||
if (MapUtil.isNotEmpty(emitters)) {
|
||||
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
|
||||
try {
|
||||
entry.getValue().send(SseEmitter.event()
|
||||
.name("message")
|
||||
.data(message));
|
||||
} catch (Exception e) {
|
||||
SseEmitter remove = emitters.remove(entry.getKey());
|
||||
if (remove != null) {
|
||||
remove.complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
USER_TOKEN_EMITTERS.remove(userId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 本机全用户会话发送消息
|
||||
*
|
||||
* @param message 要发送的消息内容
|
||||
*/
|
||||
public void sendMessage(String message) {
|
||||
for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
|
||||
sendMessage(userId, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布SSE订阅消息
|
||||
*
|
||||
* @param sseMessageDto 要发布的SSE消息对象
|
||||
*/
|
||||
public void publishMessage(SseMessageDto sseMessageDto) {
|
||||
SseMessageDto broadcastMessage = new SseMessageDto();
|
||||
broadcastMessage.setMessage(sseMessageDto.getMessage());
|
||||
broadcastMessage.setUserIds(sseMessageDto.getUserIds());
|
||||
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
|
||||
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
|
||||
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 向所有的用户发布订阅的消息(群发)
|
||||
*
|
||||
* @param message 要发布的消息内容
|
||||
*/
|
||||
public void publishAll(String message) {
|
||||
SseMessageDto broadcastMessage = new SseMessageDto();
|
||||
broadcastMessage.setMessage(message);
|
||||
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
|
||||
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package com.fuyuanshen.common.sse.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 消息的dto
|
||||
*
|
||||
* @author zendwang
|
||||
*/
|
||||
@Data
|
||||
public class SseMessageDto implements Serializable {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 需要推送到的session key 列表
|
||||
*/
|
||||
private List<Long> userIds;
|
||||
|
||||
/**
|
||||
* 需要发送的消息
|
||||
*/
|
||||
private String message;
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
package com.fuyuanshen.common.sse.listener;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fuyuanshen.common.sse.core.SseEmitterManager;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.core.Ordered;
|
||||
|
||||
/**
|
||||
* SSE 主题订阅监听器
|
||||
*
|
||||
* @author Lion Li
|
||||
*/
|
||||
@Slf4j
|
||||
public class SseTopicListener implements ApplicationRunner, Ordered {
|
||||
|
||||
@Autowired
|
||||
private SseEmitterManager sseEmitterManager;
|
||||
|
||||
/**
|
||||
* 在Spring Boot应用程序启动时初始化SSE主题订阅监听器
|
||||
*
|
||||
* @param args 应用程序参数
|
||||
* @throws Exception 初始化过程中可能抛出的异常
|
||||
*/
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
sseEmitterManager.subscribeMessage((message) -> {
|
||||
log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
|
||||
// 如果key不为空就按照key发消息 如果为空就群发
|
||||
if (CollUtil.isNotEmpty(message.getUserIds())) {
|
||||
message.getUserIds().forEach(key -> {
|
||||
sseEmitterManager.sendMessage(key, message.getMessage());
|
||||
});
|
||||
} else {
|
||||
sseEmitterManager.sendMessage(message.getMessage());
|
||||
}
|
||||
});
|
||||
log.info("初始化SSE主题订阅监听器成功");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return -1;
|
||||
}
|
||||
}
|
@ -0,0 +1,84 @@
|
||||
package com.fuyuanshen.common.sse.utils;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.fuyuanshen.common.core.utils.SpringUtils;
|
||||
import com.fuyuanshen.common.sse.core.SseEmitterManager;
|
||||
import com.fuyuanshen.common.sse.dto.SseMessageDto;
|
||||
|
||||
/**
|
||||
* SSE工具类
|
||||
*
|
||||
* @author Lion Li
|
||||
*/
|
||||
@Slf4j
|
||||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public class SseMessageUtils {
|
||||
|
||||
private final static Boolean SSE_ENABLE = SpringUtils.getProperty("sse.enabled", Boolean.class, true);
|
||||
private static SseEmitterManager MANAGER;
|
||||
|
||||
static {
|
||||
if (isEnable() && MANAGER == null) {
|
||||
MANAGER = SpringUtils.getBean(SseEmitterManager.class);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 向指定的SSE会话发送消息
|
||||
*
|
||||
* @param userId 要发送消息的用户id
|
||||
* @param message 要发送的消息内容
|
||||
*/
|
||||
public static void sendMessage(Long userId, String message) {
|
||||
if (!isEnable()) {
|
||||
return;
|
||||
}
|
||||
MANAGER.sendMessage(userId, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 本机全用户会话发送消息
|
||||
*
|
||||
* @param message 要发送的消息内容
|
||||
*/
|
||||
public static void sendMessage(String message) {
|
||||
if (!isEnable()) {
|
||||
return;
|
||||
}
|
||||
MANAGER.sendMessage(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布SSE订阅消息
|
||||
*
|
||||
* @param sseMessageDto 要发布的SSE消息对象
|
||||
*/
|
||||
public static void publishMessage(SseMessageDto sseMessageDto) {
|
||||
if (!isEnable()) {
|
||||
return;
|
||||
}
|
||||
MANAGER.publishMessage(sseMessageDto);
|
||||
}
|
||||
|
||||
/**
|
||||
* 向所有的用户发布订阅的消息(群发)
|
||||
*
|
||||
* @param message 要发布的消息内容
|
||||
*/
|
||||
public static void publishAll(String message) {
|
||||
if (!isEnable()) {
|
||||
return;
|
||||
}
|
||||
MANAGER.publishAll(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
public static Boolean isEnable() {
|
||||
return SSE_ENABLE;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
com.fuyuanshen.common.sse.config.SseAutoConfiguration
|
Reference in New Issue
Block a user