35 changed files with 783 additions and 675 deletions
@ -0,0 +1,25 @@ |
|||
package org.dromara.common.websocket; |
|||
|
|||
import org.springframework.web.socket.WebSocketSession; |
|||
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; |
|||
|
|||
/** |
|||
* @author sean.zhou |
|||
* @version 0.1 |
|||
* @date 2021/11/24 |
|||
*/ |
|||
public class ConcurrentWebSocketSession extends ConcurrentWebSocketSessionDecorator { |
|||
|
|||
private static final int SEND_BUFFER_SIZE_LIMIT = 1024 * 1024; |
|||
|
|||
private static final int SEND_TIME_LIMIT = 1000; |
|||
|
|||
private ConcurrentWebSocketSession(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) { |
|||
super(delegate, sendTimeLimit, bufferSizeLimit); |
|||
} |
|||
|
|||
ConcurrentWebSocketSession(WebSocketSession delegate) { |
|||
this(delegate, SEND_TIME_LIMIT, SEND_BUFFER_SIZE_LIMIT); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,40 @@ |
|||
package org.dromara.common.websocket; |
|||
|
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; |
|||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry; |
|||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; |
|||
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; |
|||
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; |
|||
import org.springframework.web.socket.server.HandshakeHandler; |
|||
|
|||
/** |
|||
* |
|||
* @author sean.zhou |
|||
* @date 2021/11/17 |
|||
* @version 0.1 |
|||
*/ |
|||
@EnableWebSocketMessageBroker |
|||
@Configuration |
|||
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer { |
|||
|
|||
@Autowired(required = false) |
|||
private HandshakeHandler handshakeHandler; |
|||
|
|||
@Autowired |
|||
private WebSocketHandlerDecoratorFactory webSocketHandlerDecoratorFactory; |
|||
|
|||
@Override |
|||
public void registerStompEndpoints(StompEndpointRegistry registry) { |
|||
// Set the WebSocket connection address
|
|||
registry.addEndpoint("/dkcyclog/sample/websocket").setAllowedOriginPatterns("*") |
|||
.setHandshakeHandler(handshakeHandler); |
|||
} |
|||
|
|||
@Override |
|||
public void configureWebSocketTransport(WebSocketTransportRegistration registry) { |
|||
registry.addDecoratorFactory(webSocketHandlerDecoratorFactory); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,20 @@ |
|||
package org.dromara.common.websocket; |
|||
|
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; |
|||
|
|||
/** |
|||
* |
|||
* @author sean.zhou |
|||
* @date 2021/11/16 |
|||
* @version 0.1 |
|||
*/ |
|||
@Component |
|||
public class WebSocketDefaultFactory implements WebSocketHandlerDecoratorFactory { |
|||
|
|||
@Override |
|||
public WebSocketHandler decorate(WebSocketHandler handler) { |
|||
return new WebSocketDefaultHandler(handler); |
|||
} |
|||
} |
@ -0,0 +1,40 @@ |
|||
package org.dromara.common.websocket; |
|||
|
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.web.socket.CloseStatus; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
import org.springframework.web.socket.WebSocketMessage; |
|||
import org.springframework.web.socket.WebSocketSession; |
|||
import org.springframework.web.socket.handler.WebSocketHandlerDecorator; |
|||
|
|||
/** |
|||
* |
|||
* @author sean.zhou |
|||
* @date 2021/11/16 |
|||
* @version 0.1 |
|||
*/ |
|||
public class WebSocketDefaultHandler extends WebSocketHandlerDecorator { |
|||
|
|||
private static final Logger log = LoggerFactory.getLogger(WebSocketDefaultHandler.class); |
|||
|
|||
public WebSocketDefaultHandler(WebSocketHandler delegate) { |
|||
super(delegate); |
|||
} |
|||
|
|||
@Override |
|||
public void afterConnectionEstablished(WebSocketSession session) throws Exception { |
|||
log.debug("{} is connected.", session.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { |
|||
log.debug("{} is disconnected.", session.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { |
|||
log.info("received message: {}, from: {}", message.getPayload(), session.getId()); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,84 @@ |
|||
package org.dromara.common.websocket; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonProperty; |
|||
import io.swagger.v3.oas.annotations.media.Schema; |
|||
import jakarta.validation.constraints.Min; |
|||
import jakarta.validation.constraints.NotNull; |
|||
import org.dromara.common.websocket.dto.BizCodeEnum; |
|||
|
|||
|
|||
/** |
|||
* The format of WebSocket messages that the pilot can receive. |
|||
* @author sean.zhou |
|||
* @date 2021/11/17 |
|||
* @version 0.1 |
|||
*/ |
|||
@Schema(description = "The format of WebSocket messages that the pilot can receive.") |
|||
public class WebSocketMessageResponse<T> { |
|||
|
|||
@JsonProperty("biz_code") |
|||
@NotNull |
|||
@Schema(description = "webSocket messages identity", implementation = BizCodeEnum.class) |
|||
private String bizCode; |
|||
|
|||
@Schema(description = "webSocket messages version") |
|||
private String version = "1.0"; |
|||
|
|||
@NotNull |
|||
@Min(123456789012L) |
|||
@Schema(description = "timestamp (milliseconds)") |
|||
private Long timestamp; |
|||
|
|||
@NotNull |
|||
@Schema(description = "Data corresponding to business functions") |
|||
private T data; |
|||
|
|||
public WebSocketMessageResponse() { |
|||
} |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return "WebSocketMessageResponse{" + |
|||
"bizCode=" + bizCode + |
|||
", version='" + version + '\'' + |
|||
", timestamp=" + timestamp + |
|||
", data=" + data + |
|||
'}'; |
|||
} |
|||
|
|||
public String getBizCode() { |
|||
return bizCode; |
|||
} |
|||
|
|||
public WebSocketMessageResponse<T> setBizCode(String bizCode) { |
|||
this.bizCode = bizCode; |
|||
return this; |
|||
} |
|||
|
|||
public String getVersion() { |
|||
return version; |
|||
} |
|||
|
|||
public WebSocketMessageResponse<T> setVersion(String version) { |
|||
this.version = version; |
|||
return this; |
|||
} |
|||
|
|||
public Long getTimestamp() { |
|||
return timestamp; |
|||
} |
|||
|
|||
public WebSocketMessageResponse<T> setTimestamp(Long timestamp) { |
|||
this.timestamp = timestamp; |
|||
return this; |
|||
} |
|||
|
|||
public T getData() { |
|||
return data; |
|||
} |
|||
|
|||
public WebSocketMessageResponse<T> setData(T data) { |
|||
this.data = data; |
|||
return this; |
|||
} |
|||
} |
@ -0,0 +1,64 @@ |
|||
package org.dromara.common.websocket.api; |
|||
|
|||
import org.dromara.common.sdk.common.Common; |
|||
import org.dromara.common.sdk.exception.CloudSDKErrorEnum; |
|||
import org.dromara.common.sdk.exception.CloudSDKException; |
|||
import org.dromara.common.websocket.ConcurrentWebSocketSession; |
|||
import org.dromara.common.websocket.dto.WebSocketMessageResponse; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.web.socket.TextMessage; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.Collection; |
|||
|
|||
/** |
|||
* @author sean.zhou |
|||
* @version 0.1 |
|||
* @date 2021/11/24 |
|||
*/ |
|||
public class WebSocketMessageSend { |
|||
|
|||
private static final Logger log = LoggerFactory.getLogger(WebSocketMessageSend.class); |
|||
|
|||
public void sendMessage(ConcurrentWebSocketSession session, WebSocketMessageResponse message) { |
|||
if (session == null) { |
|||
return; |
|||
} |
|||
|
|||
try { |
|||
if (!session.isOpen()) { |
|||
session.close(); |
|||
log.info("This session is closed."); |
|||
return; |
|||
} |
|||
|
|||
session.sendMessage(new TextMessage(Common.getObjectMapper().writeValueAsBytes(message))); |
|||
} catch (IOException e) { |
|||
throw new CloudSDKException(CloudSDKErrorEnum.WEBSOCKET_PUBLISH_ABNORMAL, e.getLocalizedMessage()); |
|||
} |
|||
} |
|||
|
|||
public void sendBatch(Collection<ConcurrentWebSocketSession> sessions, WebSocketMessageResponse message) { |
|||
if (sessions.isEmpty()) { |
|||
return; |
|||
} |
|||
|
|||
try { |
|||
|
|||
TextMessage data = new TextMessage(Common.getObjectMapper().writeValueAsBytes(message)); |
|||
|
|||
for (ConcurrentWebSocketSession session : sessions) { |
|||
if (!session.isOpen()) { |
|||
session.close(); |
|||
log.info("This session is closed."); |
|||
return; |
|||
} |
|||
session.sendMessage(data); |
|||
} |
|||
|
|||
} catch (IOException e) { |
|||
throw new CloudSDKException(CloudSDKErrorEnum.WEBSOCKET_PUBLISH_ABNORMAL, e.getLocalizedMessage()); |
|||
} |
|||
} |
|||
} |
@ -1,71 +0,0 @@ |
|||
package org.dromara.common.websocket.config; |
|||
|
|||
import cn.hutool.core.util.StrUtil; |
|||
import org.dromara.common.websocket.config.properties.WebSocketProperties; |
|||
import org.dromara.common.websocket.handler.PlusWebSocketHandler; |
|||
import org.dromara.common.websocket.interceptor.PlusWebSocketInterceptor; |
|||
import org.dromara.common.websocket.listener.WebSocketTopicListener; |
|||
import org.springframework.boot.autoconfigure.AutoConfiguration; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
|||
import org.springframework.boot.context.properties.EnableConfigurationProperties; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
import org.springframework.web.socket.WebSocketSession; |
|||
import org.springframework.web.socket.config.annotation.EnableWebSocket; |
|||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer; |
|||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; |
|||
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; |
|||
import org.springframework.web.socket.server.HandshakeInterceptor; |
|||
|
|||
/** |
|||
* WebSocket 配置 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@AutoConfiguration |
|||
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") |
|||
@EnableConfigurationProperties(WebSocketProperties.class) |
|||
@EnableWebSocket |
|||
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { |
|||
@Override |
|||
public void configureWebSocketTransport(WebSocketTransportRegistration registration) { |
|||
registration.setSendTimeLimit(15 * 1000); // 消息发送最大超时
|
|||
registration.setSendBufferSizeLimit(1024 * 1024); // 发送缓冲区大小
|
|||
registration.setMessageSizeLimit(128 * 1024); // 消息大小限制
|
|||
} |
|||
@Bean |
|||
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, |
|||
WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { |
|||
// 如果WebSocket的路径为空,则设置默认路径为 "/websocket"
|
|||
if (StrUtil.isBlank(webSocketProperties.getPath())) { |
|||
webSocketProperties.setPath("/websocket"); |
|||
} |
|||
|
|||
// 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求
|
|||
if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { |
|||
webSocketProperties.setAllowedOrigins("*"); |
|||
} |
|||
|
|||
// 返回一个WebSocketConfigurer对象,用于配置WebSocket
|
|||
return registry -> registry |
|||
// 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源
|
|||
.addHandler(webSocketHandler, webSocketProperties.getPath()) |
|||
.addInterceptors(handshakeInterceptor) |
|||
.setAllowedOrigins(webSocketProperties.getAllowedOrigins()); |
|||
} |
|||
|
|||
@Bean |
|||
public HandshakeInterceptor handshakeInterceptor() { |
|||
return new PlusWebSocketInterceptor(); |
|||
} |
|||
|
|||
@Bean |
|||
public WebSocketHandler webSocketHandler() { |
|||
return new PlusWebSocketHandler(); |
|||
} |
|||
|
|||
@Bean |
|||
public WebSocketTopicListener topicListener() { |
|||
return new WebSocketTopicListener(); |
|||
} |
|||
} |
@ -1,26 +0,0 @@ |
|||
package org.dromara.common.websocket.config.properties; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
|
|||
/** |
|||
* WebSocket 配置项 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@ConfigurationProperties("websocket") |
|||
@Data |
|||
public class WebSocketProperties { |
|||
|
|||
private Boolean enabled; |
|||
|
|||
/** |
|||
* 路径 |
|||
*/ |
|||
private String path; |
|||
|
|||
/** |
|||
* 设置访问源地址 |
|||
*/ |
|||
private String allowedOrigins; |
|||
} |
@ -1,29 +0,0 @@ |
|||
package org.dromara.common.websocket.constant; |
|||
|
|||
/** |
|||
* websocket的常量配置 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
public interface WebSocketConstants { |
|||
|
|||
/** |
|||
* websocketSession中的参数的key |
|||
*/ |
|||
String LOGIN_USER_KEY = "loginUser"; |
|||
|
|||
/** |
|||
* 订阅的频道 |
|||
*/ |
|||
String WEB_SOCKET_TOPIC = "global:websocket"; |
|||
|
|||
/** |
|||
* 前端心跳检查的命令 |
|||
*/ |
|||
String PING = "ping"; |
|||
|
|||
/** |
|||
* 服务端心跳恢复的字符串 |
|||
*/ |
|||
String PONG = "pong"; |
|||
} |
@ -1,135 +0,0 @@ |
|||
package org.dromara.common.websocket.handler; |
|||
|
|||
import cn.hutool.core.util.ObjectUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.websocket.dto.WebSocketMessageDto; |
|||
import org.dromara.common.websocket.holder.WebSocketSessionHolder; |
|||
import org.dromara.common.websocket.utils.WebSocketUtils; |
|||
import org.dromara.system.api.model.LoginUser; |
|||
import org.springframework.web.socket.*; |
|||
import org.springframework.web.socket.handler.AbstractWebSocketHandler; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.List; |
|||
|
|||
import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; |
|||
|
|||
/** |
|||
* WebSocketHandler 实现类 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@Slf4j |
|||
public class PlusWebSocketHandler extends AbstractWebSocketHandler { |
|||
|
|||
/** |
|||
* 连接成功后 |
|||
*/ |
|||
@Override |
|||
public void afterConnectionEstablished(WebSocketSession session) throws IOException { |
|||
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); |
|||
if (ObjectUtil.isNull(loginUser)) { |
|||
session.close(CloseStatus.BAD_DATA); |
|||
log.info("[connect] invalid token received. sessionId: {}", session.getId()); |
|||
return; |
|||
} |
|||
WebSocketSessionHolder.addSession(loginUser.getUserId(), session); |
|||
log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); |
|||
} |
|||
|
|||
/** |
|||
* 处理接收到的文本消息 |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param message 接收到的文本消息 |
|||
* @throws Exception 处理消息过程中可能抛出的异常 |
|||
*/ |
|||
@Override |
|||
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { |
|||
// 从WebSocket会话中获取登录用户信息
|
|||
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); |
|||
// 检查loginUser是否为空
|
|||
if (ObjectUtil.isNull(loginUser)) { |
|||
log.error("[handleTextMessage] loginUser is null. sessionId: {}", session.getId()); |
|||
session.close(CloseStatus.BAD_DATA); |
|||
return; |
|||
} |
|||
// 创建WebSocket消息DTO对象
|
|||
/*WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); |
|||
webSocketMessageDto.setSessionKeys(List.of(loginUser.getUserId())); |
|||
webSocketMessageDto.setMessage(message.getPayload());*/ |
|||
WebSocketUtils.sendMessage(loginUser.getUserId(),"pong"); |
|||
WebSocketUtils.sendPongMessage(session); |
|||
} |
|||
|
|||
/** |
|||
* 处理接收到的二进制消息 |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param message 接收到的二进制消息 |
|||
* @throws Exception 处理消息过程中可能抛出的异常 |
|||
*/ |
|||
@Override |
|||
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { |
|||
super.handleBinaryMessage(session, message); |
|||
} |
|||
|
|||
/** |
|||
* 处理接收到的Pong消息(心跳监测) |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param message 接收到的Pong消息 |
|||
* @throws Exception 处理消息过程中可能抛出的异常 |
|||
*/ |
|||
@Override |
|||
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { |
|||
WebSocketUtils.sendPongMessage(session); |
|||
} |
|||
|
|||
/** |
|||
* 处理WebSocket传输错误 |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param exception 发生的异常 |
|||
* @throws Exception 处理过程中可能抛出的异常 |
|||
*/ |
|||
@Override |
|||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { |
|||
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); |
|||
// 检查loginUser是否为空
|
|||
if (ObjectUtil.isNull(loginUser)) { |
|||
log.error("[handleTextMessage] loginUser is null. sessionId: {}", session.getId()); |
|||
session.close(CloseStatus.BAD_DATA); |
|||
return; |
|||
} |
|||
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); |
|||
} |
|||
|
|||
/** |
|||
* 在WebSocket连接关闭后执行清理操作 |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param status 关闭状态信息 |
|||
*/ |
|||
@Override |
|||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { |
|||
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); |
|||
if (ObjectUtil.isNull(loginUser)) { |
|||
log.info("[disconnect] invalid token received. sessionId: {}", session.getId()); |
|||
return; |
|||
} |
|||
WebSocketSessionHolder.removeSession(loginUser.getUserId()); |
|||
log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); |
|||
} |
|||
|
|||
/** |
|||
* 指示处理程序是否支持接收部分消息 |
|||
* |
|||
* @return 如果支持接收部分消息,则返回true;否则返回false |
|||
*/ |
|||
@Override |
|||
public boolean supportsPartialMessages() { |
|||
return false; |
|||
} |
|||
|
|||
} |
@ -1,70 +0,0 @@ |
|||
package org.dromara.common.websocket.holder; |
|||
|
|||
import lombok.AccessLevel; |
|||
import lombok.NoArgsConstructor; |
|||
import org.springframework.web.socket.WebSocketSession; |
|||
|
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
|
|||
/** |
|||
* WebSocketSession 用于保存当前所有在线的会话信息 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@NoArgsConstructor(access = AccessLevel.PRIVATE) |
|||
public class WebSocketSessionHolder { |
|||
|
|||
private static final Map<Long, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>(); |
|||
|
|||
/** |
|||
* 将WebSocket会话添加到用户会话Map中 |
|||
* |
|||
* @param sessionKey 会话键,用于检索会话 |
|||
* @param session 要添加的WebSocket会话 |
|||
*/ |
|||
public static void addSession(Long sessionKey, WebSocketSession session) { |
|||
USER_SESSION_MAP.put(sessionKey, session); |
|||
} |
|||
|
|||
/** |
|||
* 从用户会话Map中移除指定会话键对应的WebSocket会话 |
|||
* |
|||
* @param sessionKey 要移除的会话键 |
|||
*/ |
|||
public static void removeSession(Long sessionKey) { |
|||
if (USER_SESSION_MAP.containsKey(sessionKey)) { |
|||
USER_SESSION_MAP.remove(sessionKey); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 根据会话键从用户会话Map中获取WebSocket会话 |
|||
* |
|||
* @param sessionKey 要获取的会话键 |
|||
* @return 与给定会话键对应的WebSocket会话,如果不存在则返回null |
|||
*/ |
|||
public static WebSocketSession getSessions(Long sessionKey) { |
|||
return USER_SESSION_MAP.get(sessionKey); |
|||
} |
|||
|
|||
/** |
|||
* 获取存储在用户会话Map中所有WebSocket会话的会话键集合 |
|||
* |
|||
* @return 所有WebSocket会话的会话键集合 |
|||
*/ |
|||
public static Set<Long> getSessionsAll() { |
|||
return USER_SESSION_MAP.keySet(); |
|||
} |
|||
|
|||
/** |
|||
* 检查给定的会话键是否存在于用户会话Map中 |
|||
* |
|||
* @param sessionKey 要检查的会话键 |
|||
* @return 如果存在对应的会话键,则返回true;否则返回false |
|||
*/ |
|||
public static Boolean existSession(Long sessionKey) { |
|||
return USER_SESSION_MAP.containsKey(sessionKey); |
|||
} |
|||
} |
@ -1,58 +0,0 @@ |
|||
package org.dromara.common.websocket.interceptor; |
|||
|
|||
import cn.dev33.satoken.exception.NotLoginException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.satoken.utils.LoginHelper; |
|||
import org.dromara.system.api.model.LoginUser; |
|||
import org.springframework.http.server.ServerHttpRequest; |
|||
import org.springframework.http.server.ServerHttpResponse; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
import org.springframework.web.socket.server.HandshakeInterceptor; |
|||
|
|||
import java.util.Map; |
|||
|
|||
import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; |
|||
|
|||
/** |
|||
* WebSocket握手请求的拦截器 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@Slf4j |
|||
public class PlusWebSocketInterceptor implements HandshakeInterceptor { |
|||
|
|||
/** |
|||
* WebSocket握手之前执行的前置处理方法 |
|||
* |
|||
* @param request WebSocket握手请求 |
|||
* @param response WebSocket握手响应 |
|||
* @param wsHandler WebSocket处理程序 |
|||
* @param attributes 与WebSocket会话关联的属性 |
|||
* @return 如果允许握手继续进行,则返回true;否则返回false |
|||
*/ |
|||
@Override |
|||
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) { |
|||
try { |
|||
LoginUser loginUser = LoginHelper.getLoginUser(); |
|||
attributes.put(LOGIN_USER_KEY, loginUser); |
|||
return true; |
|||
} catch (NotLoginException e) { |
|||
log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage()); |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* WebSocket握手成功后执行的后置处理方法 |
|||
* |
|||
* @param request WebSocket握手请求 |
|||
* @param response WebSocket握手响应 |
|||
* @param wsHandler WebSocket处理程序 |
|||
* @param exception 握手过程中可能出现的异常 |
|||
*/ |
|||
@Override |
|||
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { |
|||
// 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作
|
|||
} |
|||
|
|||
} |
@ -1,54 +0,0 @@ |
|||
package org.dromara.common.websocket.listener; |
|||
|
|||
import cn.hutool.core.collection.CollUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.websocket.holder.WebSocketSessionHolder; |
|||
import org.dromara.common.websocket.utils.WebSocketUtils; |
|||
import org.springframework.boot.ApplicationArguments; |
|||
import org.springframework.boot.ApplicationRunner; |
|||
import org.springframework.core.Ordered; |
|||
|
|||
/** |
|||
* WebSocket 主题订阅监听器 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@Slf4j |
|||
public class WebSocketTopicListener implements ApplicationRunner, Ordered { |
|||
|
|||
/** |
|||
* 在Spring Boot应用程序启动时初始化WebSocket主题订阅监听器 |
|||
* |
|||
* @param args 应用程序参数 |
|||
* @throws Exception 初始化过程中可能抛出的异常 |
|||
*/ |
|||
@Override |
|||
public void run(ApplicationArguments args) throws Exception { |
|||
// 订阅WebSocket消息
|
|||
WebSocketUtils.subscribeMessage((message) -> { |
|||
try { |
|||
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); |
|||
// 如果key不为空就按照key发消息 如果为空就群发
|
|||
if (CollUtil.isNotEmpty(message.getSessionKeys())) { |
|||
message.getSessionKeys().forEach(key -> { |
|||
if (WebSocketSessionHolder.existSession(key)) { |
|||
WebSocketUtils.sendMessage(key, message.getMessage()); |
|||
} |
|||
}); |
|||
} else { |
|||
WebSocketSessionHolder.getSessionsAll().forEach(key -> { |
|||
WebSocketUtils.sendMessage(key, message.getMessage()); |
|||
}); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("处理消息时发生异常", e); |
|||
} |
|||
}); |
|||
log.info("初始化WebSocket主题订阅监听器成功"); |
|||
} |
|||
|
|||
@Override |
|||
public int getOrder() { |
|||
return -1; |
|||
} |
|||
} |
@ -1,185 +0,0 @@ |
|||
package org.dromara.common.websocket.utils; |
|||
|
|||
import cn.hutool.core.collection.CollUtil; |
|||
import com.fasterxml.jackson.annotation.JsonInclude; |
|||
import com.fasterxml.jackson.core.JsonProcessingException; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.fasterxml.jackson.databind.PropertyNamingStrategy; |
|||
import lombok.AccessLevel; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.checkerframework.checker.units.qual.C; |
|||
import org.dromara.common.json.utils.JsonUtils; |
|||
import org.dromara.common.redis.utils.RedisUtils; |
|||
import org.dromara.common.websocket.dto.WebSocketMessageDto; |
|||
import org.dromara.common.websocket.dto.WebSocketMessageResponse; |
|||
import org.dromara.common.websocket.holder.WebSocketSessionHolder; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.web.socket.PongMessage; |
|||
import org.springframework.web.socket.TextMessage; |
|||
import org.springframework.web.socket.WebSocketMessage; |
|||
import org.springframework.web.socket.WebSocketSession; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Objects; |
|||
import java.util.function.Consumer; |
|||
|
|||
import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; |
|||
|
|||
/** |
|||
* WebSocket工具类 |
|||
* |
|||
* @author zendwang |
|||
*/ |
|||
@Slf4j |
|||
@NoArgsConstructor(access = AccessLevel.PRIVATE) |
|||
public class WebSocketUtils { |
|||
|
|||
/** |
|||
* 向指定的WebSocket会话发送消息 |
|||
* |
|||
* @param sessionKey 要发送消息的用户id |
|||
* @param message 要发送的消息内容 |
|||
*/ |
|||
public static void sendMessage(Long sessionKey, String message) { |
|||
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); |
|||
sendMessage(session, message); |
|||
} |
|||
|
|||
/** |
|||
* 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息 |
|||
* |
|||
* @param consumer 处理WebSocket消息的消费者函数 |
|||
*/ |
|||
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) { |
|||
RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); |
|||
} |
|||
|
|||
/** |
|||
* 发布WebSocket订阅消息 |
|||
* |
|||
* @param webSocketMessage 要发布的WebSocket消息对象 |
|||
*/ |
|||
public static void publishMessage(WebSocketMessageDto webSocketMessage) { |
|||
List<Long> unsentSessionKeys = new ArrayList<>(); |
|||
// 当前服务内session,直接发送消息
|
|||
for (Long sessionKey : webSocketMessage.getSessionKeys()) { |
|||
if (WebSocketSessionHolder.existSession(sessionKey)) { |
|||
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); |
|||
continue; |
|||
} |
|||
unsentSessionKeys.add(sessionKey); |
|||
} |
|||
// 不在当前服务内session,发布订阅消息
|
|||
if (CollUtil.isNotEmpty(unsentSessionKeys)) { |
|||
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); |
|||
broadcastMessage.setMessage(webSocketMessage.getMessage()); |
|||
broadcastMessage.setSessionKeys(unsentSessionKeys); |
|||
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { |
|||
log.info("WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", |
|||
WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发布WebSocket订阅消息 |
|||
* |
|||
* @param |
|||
*/ |
|||
public static void publishAll(Integer userType, String bizCode, Object data) { |
|||
WebSocketMessageResponse message = new WebSocketMessageResponse() |
|||
.setData(Objects.requireNonNullElse(data, "")) |
|||
.setTimestamp(System.currentTimeMillis()) |
|||
.setBizCode(bizCode); |
|||
try { |
|||
ObjectMapper mapper = new ObjectMapper(); |
|||
mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); |
|||
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
|||
String jsonString = mapper.writeValueAsString(message); |
|||
WebSocketSessionHolder.getSessionsAll().forEach(key -> { |
|||
WebSocketSession session = WebSocketSessionHolder.getSessions(key); |
|||
sendMessage(session, jsonString); |
|||
}); |
|||
}catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
/** |
|||
* 发布WebSocket订阅消息 |
|||
* |
|||
* @param |
|||
*/ |
|||
public static void publishAll(String bizCode, Object data) { |
|||
WebSocketMessageResponse message = new WebSocketMessageResponse() |
|||
.setData(Objects.requireNonNullElse(data, "")) |
|||
.setTimestamp(System.currentTimeMillis()) |
|||
.setBizCode(bizCode); |
|||
try { |
|||
ObjectMapper mapper = new ObjectMapper(); |
|||
mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); |
|||
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
|||
String jsonString = mapper.writeValueAsString(message); |
|||
WebSocketSessionHolder.getSessionsAll().forEach(key -> { |
|||
WebSocketSession session = WebSocketSessionHolder.getSessions(key); |
|||
sendMessage(session, jsonString); |
|||
}); |
|||
}catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
|
|||
|
|||
/** |
|||
* 向所有的WebSocket会话发布订阅的消息(群发) |
|||
* |
|||
* @param message 要发布的消息内容 |
|||
*/ |
|||
public static void publishAll(String message) { |
|||
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); |
|||
broadcastMessage.setMessage(message); |
|||
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { |
|||
log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); |
|||
}); |
|||
} |
|||
|
|||
/** |
|||
* 向指定的WebSocket会话发送Pong消息 |
|||
* |
|||
* @param session 要发送Pong消息的WebSocket会话 |
|||
*/ |
|||
public static void sendPongMessage(WebSocketSession session) { |
|||
sendMessage(session, new PongMessage()); |
|||
} |
|||
|
|||
/** |
|||
* 向指定的WebSocket会话发送文本消息 |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param message 要发送的文本消息内容 |
|||
*/ |
|||
public static void sendMessage(WebSocketSession session, String message) { |
|||
sendMessage(session, new TextMessage(message)); |
|||
} |
|||
|
|||
/** |
|||
* 向指定的WebSocket会话发送WebSocket消息对象 |
|||
* |
|||
* @param session WebSocket会话 |
|||
* @param message 要发送的WebSocket消息对象 |
|||
*/ |
|||
private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) { |
|||
if (session == null || !session.isOpen()) { |
|||
log.warn("[send] session会话已经关闭"); |
|||
} else { |
|||
try { |
|||
session.sendMessage(message); |
|||
} catch (IOException e) { |
|||
log.error("[send] session({}) 发送消息({}) 异常", session, message, e); |
|||
} |
|||
} |
|||
} |
|||
} |
@ -1,4 +1,4 @@ |
|||
package org.dromara.common.websocket.config; |
|||
package org.dromara.sample.websocket.config; |
|||
|
|||
import org.springframework.web.socket.WebSocketSession; |
|||
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; |
@ -0,0 +1,27 @@ |
|||
package org.dromara.sample.websocket.config; |
|||
|
|||
import org.dromara.common.websocket.WebSocketDefaultFactory; |
|||
import org.dromara.sample.websocket.service.IWebSocketManageService; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Primary; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
|
|||
/** |
|||
* |
|||
* @author sean.zhou |
|||
* @date 2021/11/16 |
|||
* @version 0.1 |
|||
*/ |
|||
@Component |
|||
@Primary |
|||
public class MyWebSocketFactory extends WebSocketDefaultFactory { |
|||
|
|||
@Autowired |
|||
private IWebSocketManageService webSocketManageService; |
|||
|
|||
@Override |
|||
public WebSocketHandler decorate(WebSocketHandler handler) { |
|||
return new MyWebSocketHandler(handler, webSocketManageService); |
|||
} |
|||
} |
@ -0,0 +1,58 @@ |
|||
package org.dromara.sample.websocket.config; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.websocket.WebSocketDefaultHandler; |
|||
import org.dromara.sample.websocket.service.IWebSocketManageService; |
|||
import org.springframework.util.StringUtils; |
|||
import org.springframework.web.socket.CloseStatus; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
import org.springframework.web.socket.WebSocketMessage; |
|||
import org.springframework.web.socket.WebSocketSession; |
|||
|
|||
import java.security.Principal; |
|||
|
|||
/** |
|||
* |
|||
* @author sean.zhou |
|||
* @date 2021/11/16 |
|||
* @version 0.1 |
|||
*/ |
|||
@Slf4j |
|||
public class MyWebSocketHandler extends WebSocketDefaultHandler { |
|||
|
|||
private IWebSocketManageService webSocketManageService; |
|||
|
|||
MyWebSocketHandler(WebSocketHandler delegate, IWebSocketManageService webSocketManageService) { |
|||
super(delegate); |
|||
this.webSocketManageService = webSocketManageService; |
|||
} |
|||
|
|||
@Override |
|||
public void afterConnectionEstablished(WebSocketSession session) throws Exception { |
|||
Principal principal = session.getPrincipal(); |
|||
if (StringUtils.hasText(principal.getName())) { |
|||
webSocketManageService.put(principal.getName(), new MyConcurrentWebSocketSession(session)); |
|||
log.debug("{} is connected. ID: {}. WebSocketSession[current count: {}]", |
|||
principal.getName(), session.getId(), webSocketManageService.getConnectedCount()); |
|||
return; |
|||
} |
|||
session.close(); |
|||
} |
|||
|
|||
@Override |
|||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { |
|||
Principal principal = session.getPrincipal(); |
|||
if (StringUtils.hasText(principal.getName())) { |
|||
webSocketManageService.remove(principal.getName(), session.getId()); |
|||
log.debug("{} is disconnected. ID: {}. WebSocketSession[current count: {}]", |
|||
principal.getName(), session.getId(), webSocketManageService.getConnectedCount()); |
|||
} |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { |
|||
log.debug("received message: {}", message.getPayload()); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,93 @@ |
|||
package org.dromara.sample.websocket.model; |
|||
|
|||
/** |
|||
* @author sean |
|||
* @version 0.1 |
|||
* @date 2021/11/26 |
|||
*/ |
|||
public enum BizCodeEnum { |
|||
|
|||
DEVICE_ONLINE("device_online"), |
|||
|
|||
DEVICE_OFFLINE("device_offline"), |
|||
|
|||
DEVICE_UPDATE_TOPO("device_update_topo"), |
|||
|
|||
DEVICE_OSD("device_osd"), |
|||
|
|||
RC_OSD("gateway_osd"), |
|||
|
|||
DOCK_OSD("dock_osd"), |
|||
|
|||
MAP_ELEMENT_CREATE("map_element_create"), |
|||
|
|||
MAP_ELEMENT_UPDATE("map_element_update"), |
|||
|
|||
MAP_ELEMENT_DELETE("map_element_delete"), |
|||
|
|||
MAP_GROUP_REFRESH("map_group_refresh"), |
|||
|
|||
FLIGHT_TASK_PROGRESS("flighttask_progress"), |
|||
|
|||
DEVICE_HMS("device_hms"), |
|||
|
|||
DEVICE_REBOOT("device_reboot"), |
|||
|
|||
DRONE_OPEN("drone_open"), |
|||
|
|||
DRONE_CLOSE("drone_close"), |
|||
|
|||
DEVICE_CHECK("device_check"), |
|||
|
|||
DRONE_FORMAT("drone_format"), |
|||
|
|||
DEVICE_FORMAT("device_format"), |
|||
|
|||
COVER_OPEN("cover_open"), |
|||
|
|||
COVER_CLOSE("cover_close"), |
|||
|
|||
PUTTER_OPEN("putter_open"), |
|||
|
|||
PUTTER_CLOSE("putter_close"), |
|||
|
|||
CHARGE_OPEN("charge_open"), |
|||
|
|||
CHARGE_CLOSE("charge_close"), |
|||
|
|||
FILE_UPLOAD_CALLBACK("file_upload_callback"), |
|||
|
|||
FILE_UPLOAD_PROGRESS("fileupload_progress"), |
|||
|
|||
OTA_PROGRESS("ota_progress"), |
|||
|
|||
HIGHEST_PRIORITY_UPLOAD_FLIGHT_TASK_MEDIA("highest_priority_upload_flighttask_media"), |
|||
|
|||
CONTROL_SOURCE_CHANGE("control_source_change"), |
|||
|
|||
FLY_TO_POINT_PROGRESS("fly_to_point_progress"), |
|||
|
|||
TAKE_OFF_TO_POINT_PROGRESS("takeoff_to_point_progress"), |
|||
|
|||
DRC_STATUS_NOTIFY("drc_status_notify"), |
|||
|
|||
JOYSTICK_INVALID_NOTIFY("joystick_invalid_notify"), |
|||
|
|||
FLIGHT_AREAS_SYNC_PROGRESS("flight_areas_sync_progress"), |
|||
|
|||
FLIGHT_AREAS_DRONE_LOCATION("flight_areas_drone_location"), |
|||
|
|||
FLIGHT_AREAS_UPDATE("flight_areas_update"), |
|||
|
|||
; |
|||
|
|||
private String code; |
|||
|
|||
BizCodeEnum(String code) { |
|||
this.code = code; |
|||
} |
|||
|
|||
public String getCode() { |
|||
return code; |
|||
} |
|||
} |
@ -0,0 +1,26 @@ |
|||
package org.dromara.sample.websocket.service; |
|||
|
|||
|
|||
|
|||
import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; |
|||
|
|||
import java.util.Collection; |
|||
|
|||
/** |
|||
* @author sean |
|||
* @version 1.0 |
|||
* @date 2022/4/25 |
|||
*/ |
|||
public interface IWebSocketManageService { |
|||
|
|||
void put(String key, MyConcurrentWebSocketSession val); |
|||
|
|||
void remove(String key, String sessionId); |
|||
|
|||
Collection<MyConcurrentWebSocketSession> getValueWithWorkspace(String workspaceId); |
|||
|
|||
Collection<MyConcurrentWebSocketSession> getValueWithWorkspaceAndUserType(String workspaceId, Integer userType); |
|||
|
|||
Long getConnectedCount(); |
|||
|
|||
} |
@ -0,0 +1,33 @@ |
|||
package org.dromara.sample.websocket.service; |
|||
|
|||
|
|||
import org.dromara.common.websocket.dto.WebSocketMessageResponse; |
|||
import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; |
|||
|
|||
import java.util.Collection; |
|||
|
|||
/** |
|||
* @author sean.zhou |
|||
* @date 2021/11/24 |
|||
* @version 0.1 |
|||
*/ |
|||
public interface IWebSocketMessageService { |
|||
|
|||
/** |
|||
* Send a message to the specific connection. |
|||
* @param session A WebSocket connection object |
|||
* @param message message |
|||
*/ |
|||
void sendMessage(MyConcurrentWebSocketSession session, WebSocketMessageResponse message); |
|||
|
|||
/** |
|||
* Send the same message to specific connection. |
|||
* @param sessions A collection of WebSocket connection objects. |
|||
* @param message message |
|||
*/ |
|||
void sendBatch(Collection<MyConcurrentWebSocketSession> sessions, WebSocketMessageResponse message); |
|||
|
|||
void sendBatch(String workspaceId, Integer userType, String bizCode, Object data); |
|||
|
|||
void sendBatch(String workspaceId, String bizCode, Object data); |
|||
} |
@ -0,0 +1,86 @@ |
|||
package org.dromara.sample.websocket.service.impl; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.redis.config.RedisConst; |
|||
import org.dromara.common.redis.utils.RedisOpsUtils; |
|||
import org.dromara.sample.manage.model.enums.UserTypeEnum; |
|||
import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; |
|||
import org.dromara.sample.websocket.service.IWebSocketManageService; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.util.StringUtils; |
|||
|
|||
import java.util.Collection; |
|||
import java.util.Collections; |
|||
import java.util.Objects; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.stream.Collectors; |
|||
|
|||
/** |
|||
* @author sean |
|||
* @version 1.0 |
|||
* @date 2022/4/25 |
|||
*/ |
|||
@Slf4j |
|||
@Service |
|||
public class WebSocketManageServiceImpl implements IWebSocketManageService { |
|||
|
|||
private static final ConcurrentHashMap<String, MyConcurrentWebSocketSession> SESSIONS = new ConcurrentHashMap<>(16); |
|||
|
|||
@Override |
|||
public void put(String key, MyConcurrentWebSocketSession val) { |
|||
String[] name = key.split("/"); |
|||
if (name.length != 3) { |
|||
log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); |
|||
return; |
|||
} |
|||
String sessionId = val.getId(); |
|||
String workspaceKey = RedisConst.WEBSOCKET_PREFIX + name[0]; |
|||
String userTypeKey = RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(Integer.parseInt(name[1])).getDesc(); |
|||
RedisOpsUtils.hashSet(workspaceKey, sessionId, name[2]); |
|||
RedisOpsUtils.hashSet(userTypeKey, sessionId, name[2]); |
|||
SESSIONS.put(sessionId, val); |
|||
RedisOpsUtils.expireKey(workspaceKey, RedisConst.WEBSOCKET_ALIVE_SECOND); |
|||
RedisOpsUtils.expireKey(userTypeKey, RedisConst.WEBSOCKET_ALIVE_SECOND); |
|||
} |
|||
|
|||
@Override |
|||
public void remove(String key, String sessionId) { |
|||
String[] name = key.split("/"); |
|||
if (name.length != 3) { |
|||
log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); |
|||
return; |
|||
} |
|||
RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX + name[0], new String[] {sessionId}); |
|||
RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(Integer.parseInt(name[1])).getDesc(), new String[] {sessionId}); |
|||
SESSIONS.remove(sessionId); |
|||
} |
|||
|
|||
@Override |
|||
public Collection<MyConcurrentWebSocketSession> getValueWithWorkspace(String workspaceId) { |
|||
if (!StringUtils.hasText(workspaceId)) { |
|||
return Collections.emptySet(); |
|||
} |
|||
String key = RedisConst.WEBSOCKET_PREFIX + workspaceId; |
|||
|
|||
return RedisOpsUtils.hashKeys(key) |
|||
.stream() |
|||
.map(SESSIONS::get) |
|||
.filter(Objects::nonNull) |
|||
.collect(Collectors.toSet()); |
|||
} |
|||
|
|||
@Override |
|||
public Collection<MyConcurrentWebSocketSession> getValueWithWorkspaceAndUserType(String workspaceId, Integer userType) { |
|||
String key = RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(userType).getDesc(); |
|||
return RedisOpsUtils.hashKeys(key) |
|||
.stream() |
|||
.map(SESSIONS::get) |
|||
.filter(getValueWithWorkspace(workspaceId)::contains) |
|||
.collect(Collectors.toSet()); |
|||
} |
|||
|
|||
@Override |
|||
public Long getConnectedCount() { |
|||
return SESSIONS.mappingCount(); |
|||
} |
|||
} |
@ -0,0 +1,99 @@ |
|||
package org.dromara.sample.websocket.service.impl; |
|||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.common.websocket.dto.WebSocketMessageResponse; |
|||
import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; |
|||
import org.dromara.sample.websocket.service.IWebSocketManageService; |
|||
import org.dromara.sample.websocket.service.IWebSocketMessageService; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.util.StringUtils; |
|||
import org.springframework.web.socket.TextMessage; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.Collection; |
|||
import java.util.Objects; |
|||
|
|||
/** |
|||
* @author sean.zhou |
|||
* @version 0.1 |
|||
* @date 2021/11/24 |
|||
*/ |
|||
@Service |
|||
@Slf4j |
|||
public class WebSocketMessageServiceImpl implements IWebSocketMessageService { |
|||
|
|||
@Autowired |
|||
private ObjectMapper mapper; |
|||
|
|||
@Autowired |
|||
private IWebSocketManageService webSocketManageService; |
|||
|
|||
@Override |
|||
public void sendMessage(MyConcurrentWebSocketSession session, WebSocketMessageResponse message) { |
|||
if (session == null) { |
|||
return; |
|||
} |
|||
|
|||
try { |
|||
if (!session.isOpen()) { |
|||
session.close(); |
|||
log.debug("This session is closed."); |
|||
return; |
|||
} |
|||
|
|||
|
|||
session.sendMessage(new TextMessage(mapper.writeValueAsBytes(message))); |
|||
} catch (IOException e) { |
|||
log.info("Failed to publish the message. {}", message.toString()); |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void sendBatch(Collection<MyConcurrentWebSocketSession> sessions, WebSocketMessageResponse message) { |
|||
if (sessions.isEmpty()) { |
|||
return; |
|||
} |
|||
|
|||
try { |
|||
|
|||
TextMessage data = new TextMessage(mapper.writeValueAsBytes(message)); |
|||
|
|||
for (MyConcurrentWebSocketSession session : sessions) { |
|||
if (!session.isOpen()) { |
|||
session.close(); |
|||
log.debug("This session is closed."); |
|||
return; |
|||
} |
|||
session.sendMessage(data); |
|||
} |
|||
|
|||
} catch (IOException e) { |
|||
log.info("Failed to publish the message. {}", message.toString()); |
|||
|
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void sendBatch(String workspaceId, Integer userType, String bizCode, Object data) { |
|||
if (!StringUtils.hasText(workspaceId)) { |
|||
throw new RuntimeException("工作区ID不存在。"); |
|||
} |
|||
Collection<MyConcurrentWebSocketSession> sessions = Objects.isNull(userType) ? |
|||
webSocketManageService.getValueWithWorkspace(workspaceId) : |
|||
webSocketManageService.getValueWithWorkspaceAndUserType(workspaceId, userType); |
|||
|
|||
this.sendBatch(sessions, new WebSocketMessageResponse() |
|||
.setData(Objects.requireNonNullElse(data, "")) |
|||
.setTimestamp(System.currentTimeMillis()) |
|||
.setBizCode(bizCode)); |
|||
} |
|||
|
|||
@Override |
|||
public void sendBatch(String workspaceId, String bizCode, Object data) { |
|||
this.sendBatch(workspaceId, null, bizCode, data); |
|||
} |
|||
} |
Loading…
Reference in new issue