From 793a5bca70f156a97cdcf99953d6e9af308f9873 Mon Sep 17 00:00:00 2001 From: wuyuan <15505152113@163.com> Date: Sun, 30 Mar 2025 18:19:57 +0800 Subject: [PATCH] 11 --- .../sdk/mqtt/state/DockStateDataKeyEnum.java | 3 + dk-common/common-websocket/pom.xml | 18 ++ .../websocket/ConcurrentWebSocketSession.java | 25 +++ .../websocket/WebSocketConfiguration.java | 40 ++++ .../websocket/WebSocketDefaultFactory.java | 20 ++ .../websocket/WebSocketDefaultHandler.java | 40 ++++ .../websocket/WebSocketMessageResponse.java | 84 ++++++++ .../websocket/api/WebSocketMessageSend.java | 64 ++++++ .../websocket/config/WebSocketConfig.java | 71 ------- .../properties/WebSocketProperties.java | 26 --- .../constant/WebSocketConstants.java | 29 --- .../handler/PlusWebSocketHandler.java | 135 ------------- .../holder/WebSocketSessionHolder.java | 70 ------- .../interceptor/PlusWebSocketInterceptor.java | 58 ------ .../listener/WebSocketTopicListener.java | 54 ----- .../websocket/utils/WebSocketUtils.java | 185 ------------------ .../GlobalThreadPoolConfiguration.java | 2 +- .../service/impl/SDKControlService.java | 38 ++-- .../control/service/impl/SDKRemoteDebug.java | 6 +- .../service/impl/DeviceHmsServiceImpl.java | 7 +- .../service/impl/DeviceLogsServiceImpl.java | 7 +- .../service/impl/FlightAreaServiceImpl.java | 15 +- .../impl/WorkspaceElementServiceImpl.java | 15 +- .../media/service/impl/MediaServiceImpl.java | 9 +- .../service/impl/SDKWaylineService.java | 8 +- .../service/impl/WaylineFileServiceImpl.java | 1 - .../config/MyConcurrentWebSocketSession.java | 2 +- .../websocket/config/MyWebSocketFactory.java | 27 +++ .../websocket/config/MyWebSocketHandler.java | 58 ++++++ .../sample/websocket/model/BizCodeEnum.java | 93 +++++++++ .../service/IWebSocketManageService.java | 26 +++ .../service/IWebSocketMessageService.java | 33 ++++ .../impl/WebSocketManageServiceImpl.java | 86 ++++++++ .../impl/WebSocketMessageServiceImpl.java | 99 ++++++++++ pom.xml | 4 +- 35 files changed, 783 insertions(+), 675 deletions(-) create mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/ConcurrentWebSocketSession.java create mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketConfiguration.java create mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultFactory.java create mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultHandler.java create mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketMessageResponse.java create mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/api/WebSocketMessageSend.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java delete mode 100644 dk-common/common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java rename {dk-common/common-websocket/src/main/java/org/dromara/common => dk-modules/sample/src/main/java/org/dromara/sample}/websocket/config/MyConcurrentWebSocketSession.java (94%) create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketFactory.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketHandler.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/model/BizCodeEnum.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketManageService.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketMessageService.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketManageServiceImpl.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketMessageServiceImpl.java diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/state/DockStateDataKeyEnum.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/state/DockStateDataKeyEnum.java index 39a2ad2..34aa24b 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/state/DockStateDataKeyEnum.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/state/DockStateDataKeyEnum.java @@ -55,6 +55,9 @@ public enum DockStateDataKeyEnum { PSDK_WIDGE_VALUES(Set.of("psdk_widget_values"), PsdkWidgetValues.class), PSDK_UI_RESOURCE(Set.of("psdk_ui_resource"), PsdkUiResource.class), + + + //AI_SPOTLIGHT_ZOOM_STATE(Set.of("ai_spotlight_zoom_state"), PsdkUiResource.class), ; private final Set keys; diff --git a/dk-common/common-websocket/pom.xml b/dk-common/common-websocket/pom.xml index f2e6e4e..ac54281 100644 --- a/dk-common/common-websocket/pom.xml +++ b/dk-common/common-websocket/pom.xml @@ -7,6 +7,18 @@ dk-common ${revision} + + + + org.apache.maven.plugins + maven-compiler-plugin + + 21 + 21 + + + + 4.0.0 common-websocket @@ -36,5 +48,11 @@ org.springframework.boot spring-boot-starter-websocket + + org.dromara + common-cloudsdk + 2.2.2 + compile + diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/ConcurrentWebSocketSession.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/ConcurrentWebSocketSession.java new file mode 100644 index 0000000..340481e --- /dev/null +++ b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/ConcurrentWebSocketSession.java @@ -0,0 +1,25 @@ +package org.dromara.common.websocket; + +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; + +/** + * @author sean.zhou + * @version 0.1 + * @date 2021/11/24 + */ +public class ConcurrentWebSocketSession extends ConcurrentWebSocketSessionDecorator { + + private static final int SEND_BUFFER_SIZE_LIMIT = 1024 * 1024; + + private static final int SEND_TIME_LIMIT = 1000; + + private ConcurrentWebSocketSession(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) { + super(delegate, sendTimeLimit, bufferSizeLimit); + } + + ConcurrentWebSocketSession(WebSocketSession delegate) { + this(delegate, SEND_TIME_LIMIT, SEND_BUFFER_SIZE_LIMIT); + } + +} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketConfiguration.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketConfiguration.java new file mode 100644 index 0000000..7eb4af7 --- /dev/null +++ b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketConfiguration.java @@ -0,0 +1,40 @@ +package org.dromara.common.websocket; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; +import org.springframework.web.socket.config.annotation.StompEndpointRegistry; +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; +import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; +import org.springframework.web.socket.server.HandshakeHandler; + +/** + * + * @author sean.zhou + * @date 2021/11/17 + * @version 0.1 + */ +@EnableWebSocketMessageBroker +@Configuration +public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer { + + @Autowired(required = false) + private HandshakeHandler handshakeHandler; + + @Autowired + private WebSocketHandlerDecoratorFactory webSocketHandlerDecoratorFactory; + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + // Set the WebSocket connection address + registry.addEndpoint("/dkcyclog/sample/websocket").setAllowedOriginPatterns("*") + .setHandshakeHandler(handshakeHandler); + } + + @Override + public void configureWebSocketTransport(WebSocketTransportRegistration registry) { + registry.addDecoratorFactory(webSocketHandlerDecoratorFactory); + } + +} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultFactory.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultFactory.java new file mode 100644 index 0000000..c8ef0c4 --- /dev/null +++ b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultFactory.java @@ -0,0 +1,20 @@ +package org.dromara.common.websocket; + +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; + +/** + * + * @author sean.zhou + * @date 2021/11/16 + * @version 0.1 + */ +@Component +public class WebSocketDefaultFactory implements WebSocketHandlerDecoratorFactory { + + @Override + public WebSocketHandler decorate(WebSocketHandler handler) { + return new WebSocketDefaultHandler(handler); + } +} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultHandler.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultHandler.java new file mode 100644 index 0000000..801571a --- /dev/null +++ b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketDefaultHandler.java @@ -0,0 +1,40 @@ +package org.dromara.common.websocket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.WebSocketHandlerDecorator; + +/** + * + * @author sean.zhou + * @date 2021/11/16 + * @version 0.1 + */ +public class WebSocketDefaultHandler extends WebSocketHandlerDecorator { + + private static final Logger log = LoggerFactory.getLogger(WebSocketDefaultHandler.class); + + public WebSocketDefaultHandler(WebSocketHandler delegate) { + super(delegate); + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + log.debug("{} is connected.", session.getId()); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + log.debug("{} is disconnected.", session.getId()); + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + log.info("received message: {}, from: {}", message.getPayload(), session.getId()); + } + +} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketMessageResponse.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketMessageResponse.java new file mode 100644 index 0000000..072a888 --- /dev/null +++ b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketMessageResponse.java @@ -0,0 +1,84 @@ +package org.dromara.common.websocket; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import org.dromara.common.websocket.dto.BizCodeEnum; + + +/** + * The format of WebSocket messages that the pilot can receive. + * @author sean.zhou + * @date 2021/11/17 + * @version 0.1 + */ +@Schema(description = "The format of WebSocket messages that the pilot can receive.") +public class WebSocketMessageResponse { + + @JsonProperty("biz_code") + @NotNull + @Schema(description = "webSocket messages identity", implementation = BizCodeEnum.class) + private String bizCode; + + @Schema(description = "webSocket messages version") + private String version = "1.0"; + + @NotNull + @Min(123456789012L) + @Schema(description = "timestamp (milliseconds)") + private Long timestamp; + + @NotNull + @Schema(description = "Data corresponding to business functions") + private T data; + + public WebSocketMessageResponse() { + } + + @Override + public String toString() { + return "WebSocketMessageResponse{" + + "bizCode=" + bizCode + + ", version='" + version + '\'' + + ", timestamp=" + timestamp + + ", data=" + data + + '}'; + } + + public String getBizCode() { + return bizCode; + } + + public WebSocketMessageResponse setBizCode(String bizCode) { + this.bizCode = bizCode; + return this; + } + + public String getVersion() { + return version; + } + + public WebSocketMessageResponse setVersion(String version) { + this.version = version; + return this; + } + + public Long getTimestamp() { + return timestamp; + } + + public WebSocketMessageResponse setTimestamp(Long timestamp) { + this.timestamp = timestamp; + return this; + } + + public T getData() { + return data; + } + + public WebSocketMessageResponse setData(T data) { + this.data = data; + return this; + } +} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/api/WebSocketMessageSend.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/api/WebSocketMessageSend.java new file mode 100644 index 0000000..da59306 --- /dev/null +++ b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/api/WebSocketMessageSend.java @@ -0,0 +1,64 @@ +package org.dromara.common.websocket.api; + +import org.dromara.common.sdk.common.Common; +import org.dromara.common.sdk.exception.CloudSDKErrorEnum; +import org.dromara.common.sdk.exception.CloudSDKException; +import org.dromara.common.websocket.ConcurrentWebSocketSession; +import org.dromara.common.websocket.dto.WebSocketMessageResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.socket.TextMessage; + +import java.io.IOException; +import java.util.Collection; + +/** + * @author sean.zhou + * @version 0.1 + * @date 2021/11/24 + */ +public class WebSocketMessageSend { + + private static final Logger log = LoggerFactory.getLogger(WebSocketMessageSend.class); + + public void sendMessage(ConcurrentWebSocketSession session, WebSocketMessageResponse message) { + if (session == null) { + return; + } + + try { + if (!session.isOpen()) { + session.close(); + log.info("This session is closed."); + return; + } + + session.sendMessage(new TextMessage(Common.getObjectMapper().writeValueAsBytes(message))); + } catch (IOException e) { + throw new CloudSDKException(CloudSDKErrorEnum.WEBSOCKET_PUBLISH_ABNORMAL, e.getLocalizedMessage()); + } + } + + public void sendBatch(Collection sessions, WebSocketMessageResponse message) { + if (sessions.isEmpty()) { + return; + } + + try { + + TextMessage data = new TextMessage(Common.getObjectMapper().writeValueAsBytes(message)); + + for (ConcurrentWebSocketSession session : sessions) { + if (!session.isOpen()) { + session.close(); + log.info("This session is closed."); + return; + } + session.sendMessage(data); + } + + } catch (IOException e) { + throw new CloudSDKException(CloudSDKErrorEnum.WEBSOCKET_PUBLISH_ABNORMAL, e.getLocalizedMessage()); + } + } +} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java deleted file mode 100644 index 5d77db5..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.dromara.common.websocket.config; - -import cn.hutool.core.util.StrUtil; -import org.dromara.common.websocket.config.properties.WebSocketProperties; -import org.dromara.common.websocket.handler.PlusWebSocketHandler; -import org.dromara.common.websocket.interceptor.PlusWebSocketInterceptor; -import org.dromara.common.websocket.listener.WebSocketTopicListener; -import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.springframework.web.socket.config.annotation.WebSocketConfigurer; -import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; -import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; -import org.springframework.web.socket.server.HandshakeInterceptor; - -/** - * WebSocket 配置 - * - * @author zendwang - */ -@AutoConfiguration -@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") -@EnableConfigurationProperties(WebSocketProperties.class) -@EnableWebSocket -public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { - @Override - public void configureWebSocketTransport(WebSocketTransportRegistration registration) { - registration.setSendTimeLimit(15 * 1000); // 消息发送最大超时 - registration.setSendBufferSizeLimit(1024 * 1024); // 发送缓冲区大小 - registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 - } - @Bean - public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, - WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { - // 如果WebSocket的路径为空,则设置默认路径为 "/websocket" - if (StrUtil.isBlank(webSocketProperties.getPath())) { - webSocketProperties.setPath("/websocket"); - } - - // 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求 - if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { - webSocketProperties.setAllowedOrigins("*"); - } - - // 返回一个WebSocketConfigurer对象,用于配置WebSocket - return registry -> registry - // 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源 - .addHandler(webSocketHandler, webSocketProperties.getPath()) - .addInterceptors(handshakeInterceptor) - .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); - } - - @Bean - public HandshakeInterceptor handshakeInterceptor() { - return new PlusWebSocketInterceptor(); - } - - @Bean - public WebSocketHandler webSocketHandler() { - return new PlusWebSocketHandler(); - } - - @Bean - public WebSocketTopicListener topicListener() { - return new WebSocketTopicListener(); - } -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java deleted file mode 100644 index d629fe5..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/properties/WebSocketProperties.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.dromara.common.websocket.config.properties; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; - -/** - * WebSocket 配置项 - * - * @author zendwang - */ -@ConfigurationProperties("websocket") -@Data -public class WebSocketProperties { - - private Boolean enabled; - - /** - * 路径 - */ - private String path; - - /** - * 设置访问源地址 - */ - private String allowedOrigins; -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java deleted file mode 100644 index e243279..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.dromara.common.websocket.constant; - -/** - * websocket的常量配置 - * - * @author zendwang - */ -public interface WebSocketConstants { - - /** - * websocketSession中的参数的key - */ - String LOGIN_USER_KEY = "loginUser"; - - /** - * 订阅的频道 - */ - String WEB_SOCKET_TOPIC = "global:websocket"; - - /** - * 前端心跳检查的命令 - */ - String PING = "ping"; - - /** - * 服务端心跳恢复的字符串 - */ - String PONG = "pong"; -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java deleted file mode 100644 index a65fd1f..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java +++ /dev/null @@ -1,135 +0,0 @@ -package org.dromara.common.websocket.handler; - -import cn.hutool.core.util.ObjectUtil; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.websocket.dto.WebSocketMessageDto; -import org.dromara.common.websocket.holder.WebSocketSessionHolder; -import org.dromara.common.websocket.utils.WebSocketUtils; -import org.dromara.system.api.model.LoginUser; -import org.springframework.web.socket.*; -import org.springframework.web.socket.handler.AbstractWebSocketHandler; - -import java.io.IOException; -import java.util.List; - -import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; - -/** - * WebSocketHandler 实现类 - * - * @author zendwang - */ -@Slf4j -public class PlusWebSocketHandler extends AbstractWebSocketHandler { - - /** - * 连接成功后 - */ - @Override - public void afterConnectionEstablished(WebSocketSession session) throws IOException { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - if (ObjectUtil.isNull(loginUser)) { - session.close(CloseStatus.BAD_DATA); - log.info("[connect] invalid token received. sessionId: {}", session.getId()); - return; - } - WebSocketSessionHolder.addSession(loginUser.getUserId(), session); - log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); - } - - /** - * 处理接收到的文本消息 - * - * @param session WebSocket会话 - * @param message 接收到的文本消息 - * @throws Exception 处理消息过程中可能抛出的异常 - */ - @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - // 从WebSocket会话中获取登录用户信息 - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - // 检查loginUser是否为空 - if (ObjectUtil.isNull(loginUser)) { - log.error("[handleTextMessage] loginUser is null. sessionId: {}", session.getId()); - session.close(CloseStatus.BAD_DATA); - return; - } - // 创建WebSocket消息DTO对象 - /*WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); - webSocketMessageDto.setSessionKeys(List.of(loginUser.getUserId())); - webSocketMessageDto.setMessage(message.getPayload());*/ - WebSocketUtils.sendMessage(loginUser.getUserId(),"pong"); - WebSocketUtils.sendPongMessage(session); - } - - /** - * 处理接收到的二进制消息 - * - * @param session WebSocket会话 - * @param message 接收到的二进制消息 - * @throws Exception 处理消息过程中可能抛出的异常 - */ - @Override - protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { - super.handleBinaryMessage(session, message); - } - - /** - * 处理接收到的Pong消息(心跳监测) - * - * @param session WebSocket会话 - * @param message 接收到的Pong消息 - * @throws Exception 处理消息过程中可能抛出的异常 - */ - @Override - protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { - WebSocketUtils.sendPongMessage(session); - } - - /** - * 处理WebSocket传输错误 - * - * @param session WebSocket会话 - * @param exception 发生的异常 - * @throws Exception 处理过程中可能抛出的异常 - */ - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - // 检查loginUser是否为空 - if (ObjectUtil.isNull(loginUser)) { - log.error("[handleTextMessage] loginUser is null. sessionId: {}", session.getId()); - session.close(CloseStatus.BAD_DATA); - return; - } - log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); - } - - /** - * 在WebSocket连接关闭后执行清理操作 - * - * @param session WebSocket会话 - * @param status 关闭状态信息 - */ - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - if (ObjectUtil.isNull(loginUser)) { - log.info("[disconnect] invalid token received. sessionId: {}", session.getId()); - return; - } - WebSocketSessionHolder.removeSession(loginUser.getUserId()); - log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); - } - - /** - * 指示处理程序是否支持接收部分消息 - * - * @return 如果支持接收部分消息,则返回true;否则返回false - */ - @Override - public boolean supportsPartialMessages() { - return false; - } - -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java deleted file mode 100644 index 368801c..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.dromara.common.websocket.holder; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.springframework.web.socket.WebSocketSession; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * WebSocketSession 用于保存当前所有在线的会话信息 - * - * @author zendwang - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class WebSocketSessionHolder { - - private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>(); - - /** - * 将WebSocket会话添加到用户会话Map中 - * - * @param sessionKey 会话键,用于检索会话 - * @param session 要添加的WebSocket会话 - */ - public static void addSession(Long sessionKey, WebSocketSession session) { - USER_SESSION_MAP.put(sessionKey, session); - } - - /** - * 从用户会话Map中移除指定会话键对应的WebSocket会话 - * - * @param sessionKey 要移除的会话键 - */ - public static void removeSession(Long sessionKey) { - if (USER_SESSION_MAP.containsKey(sessionKey)) { - USER_SESSION_MAP.remove(sessionKey); - } - } - - /** - * 根据会话键从用户会话Map中获取WebSocket会话 - * - * @param sessionKey 要获取的会话键 - * @return 与给定会话键对应的WebSocket会话,如果不存在则返回null - */ - public static WebSocketSession getSessions(Long sessionKey) { - return USER_SESSION_MAP.get(sessionKey); - } - - /** - * 获取存储在用户会话Map中所有WebSocket会话的会话键集合 - * - * @return 所有WebSocket会话的会话键集合 - */ - public static Set getSessionsAll() { - return USER_SESSION_MAP.keySet(); - } - - /** - * 检查给定的会话键是否存在于用户会话Map中 - * - * @param sessionKey 要检查的会话键 - * @return 如果存在对应的会话键,则返回true;否则返回false - */ - public static Boolean existSession(Long sessionKey) { - return USER_SESSION_MAP.containsKey(sessionKey); - } -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java deleted file mode 100644 index fafe699..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.dromara.common.websocket.interceptor; - -import cn.dev33.satoken.exception.NotLoginException; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.satoken.utils.LoginHelper; -import org.dromara.system.api.model.LoginUser; -import org.springframework.http.server.ServerHttpRequest; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.HandshakeInterceptor; - -import java.util.Map; - -import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; - -/** - * WebSocket握手请求的拦截器 - * - * @author zendwang - */ -@Slf4j -public class PlusWebSocketInterceptor implements HandshakeInterceptor { - - /** - * WebSocket握手之前执行的前置处理方法 - * - * @param request WebSocket握手请求 - * @param response WebSocket握手响应 - * @param wsHandler WebSocket处理程序 - * @param attributes 与WebSocket会话关联的属性 - * @return 如果允许握手继续进行,则返回true;否则返回false - */ - @Override - public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { - try { - LoginUser loginUser = LoginHelper.getLoginUser(); - attributes.put(LOGIN_USER_KEY, loginUser); - return true; - } catch (NotLoginException e) { - log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage()); - return false; - } - } - - /** - * WebSocket握手成功后执行的后置处理方法 - * - * @param request WebSocket握手请求 - * @param response WebSocket握手响应 - * @param wsHandler WebSocket处理程序 - * @param exception 握手过程中可能出现的异常 - */ - @Override - public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - // 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作 - } - -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java deleted file mode 100644 index 31617df..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.dromara.common.websocket.listener; - -import cn.hutool.core.collection.CollUtil; -import lombok.extern.slf4j.Slf4j; -import org.dromara.common.websocket.holder.WebSocketSessionHolder; -import org.dromara.common.websocket.utils.WebSocketUtils; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.core.Ordered; - -/** - * WebSocket 主题订阅监听器 - * - * @author zendwang - */ -@Slf4j -public class WebSocketTopicListener implements ApplicationRunner, Ordered { - - /** - * 在Spring Boot应用程序启动时初始化WebSocket主题订阅监听器 - * - * @param args 应用程序参数 - * @throws Exception 初始化过程中可能抛出的异常 - */ - @Override - public void run(ApplicationArguments args) throws Exception { - // 订阅WebSocket消息 - WebSocketUtils.subscribeMessage((message) -> { - try { - log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); - // 如果key不为空就按照key发消息 如果为空就群发 - if (CollUtil.isNotEmpty(message.getSessionKeys())) { - message.getSessionKeys().forEach(key -> { - if (WebSocketSessionHolder.existSession(key)) { - WebSocketUtils.sendMessage(key, message.getMessage()); - } - }); - } else { - WebSocketSessionHolder.getSessionsAll().forEach(key -> { - WebSocketUtils.sendMessage(key, message.getMessage()); - }); - } - } catch (Exception e) { - log.error("处理消息时发生异常", e); - } - }); - log.info("初始化WebSocket主题订阅监听器成功"); - } - - @Override - public int getOrder() { - return -1; - } -} diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java b/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java deleted file mode 100644 index bf6b127..0000000 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java +++ /dev/null @@ -1,185 +0,0 @@ -package org.dromara.common.websocket.utils; - -import cn.hutool.core.collection.CollUtil; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategy; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.units.qual.C; -import org.dromara.common.json.utils.JsonUtils; -import org.dromara.common.redis.utils.RedisUtils; -import org.dromara.common.websocket.dto.WebSocketMessageDto; -import org.dromara.common.websocket.dto.WebSocketMessageResponse; -import org.dromara.common.websocket.holder.WebSocketSessionHolder; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.socket.PongMessage; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketMessage; -import org.springframework.web.socket.WebSocketSession; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.function.Consumer; - -import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; - -/** - * WebSocket工具类 - * - * @author zendwang - */ -@Slf4j -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public class WebSocketUtils { - - /** - * 向指定的WebSocket会话发送消息 - * - * @param sessionKey 要发送消息的用户id - * @param message 要发送的消息内容 - */ - public static void sendMessage(Long sessionKey, String message) { - WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); - sendMessage(session, message); - } - - /** - * 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息 - * - * @param consumer 处理WebSocket消息的消费者函数 - */ - public static void subscribeMessage(Consumer consumer) { - RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); - } - - /** - * 发布WebSocket订阅消息 - * - * @param webSocketMessage 要发布的WebSocket消息对象 - */ - public static void publishMessage(WebSocketMessageDto webSocketMessage) { - List unsentSessionKeys = new ArrayList<>(); - // 当前服务内session,直接发送消息 - for (Long sessionKey : webSocketMessage.getSessionKeys()) { - if (WebSocketSessionHolder.existSession(sessionKey)) { - WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); - continue; - } - unsentSessionKeys.add(sessionKey); - } - // 不在当前服务内session,发布订阅消息 - if (CollUtil.isNotEmpty(unsentSessionKeys)) { - WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); - broadcastMessage.setMessage(webSocketMessage.getMessage()); - broadcastMessage.setSessionKeys(unsentSessionKeys); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info("WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", - WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); - }); - } - } - - /** - * 发布WebSocket订阅消息 - * - * @param - */ - public static void publishAll(Integer userType, String bizCode, Object data) { - WebSocketMessageResponse message = new WebSocketMessageResponse() - .setData(Objects.requireNonNullElse(data, "")) - .setTimestamp(System.currentTimeMillis()) - .setBizCode(bizCode); - try { - ObjectMapper mapper = new ObjectMapper(); - mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - String jsonString = mapper.writeValueAsString(message); - WebSocketSessionHolder.getSessionsAll().forEach(key -> { - WebSocketSession session = WebSocketSessionHolder.getSessions(key); - sendMessage(session, jsonString); - }); - }catch (Exception e) { - e.printStackTrace(); - } - } - /** - * 发布WebSocket订阅消息 - * - * @param - */ - public static void publishAll(String bizCode, Object data) { - WebSocketMessageResponse message = new WebSocketMessageResponse() - .setData(Objects.requireNonNullElse(data, "")) - .setTimestamp(System.currentTimeMillis()) - .setBizCode(bizCode); - try { - ObjectMapper mapper = new ObjectMapper(); - mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - String jsonString = mapper.writeValueAsString(message); - WebSocketSessionHolder.getSessionsAll().forEach(key -> { - WebSocketSession session = WebSocketSessionHolder.getSessions(key); - sendMessage(session, jsonString); - }); - }catch (Exception e) { - e.printStackTrace(); - } - } - - - - /** - * 向所有的WebSocket会话发布订阅的消息(群发) - * - * @param message 要发布的消息内容 - */ - public static void publishAll(String message) { - WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); - broadcastMessage.setMessage(message); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); - }); - } - - /** - * 向指定的WebSocket会话发送Pong消息 - * - * @param session 要发送Pong消息的WebSocket会话 - */ - public static void sendPongMessage(WebSocketSession session) { - sendMessage(session, new PongMessage()); - } - - /** - * 向指定的WebSocket会话发送文本消息 - * - * @param session WebSocket会话 - * @param message 要发送的文本消息内容 - */ - public static void sendMessage(WebSocketSession session, String message) { - sendMessage(session, new TextMessage(message)); - } - - /** - * 向指定的WebSocket会话发送WebSocket消息对象 - * - * @param session WebSocket会话 - * @param message 要发送的WebSocket消息对象 - */ - private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage message) { - if (session == null || !session.isOpen()) { - log.warn("[send] session会话已经关闭"); - } else { - try { - session.sendMessage(message); - } catch (IOException e) { - log.error("[send] session({}) 发送消息({}) 异常", session, message, e); - } - } - } -} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/GlobalThreadPoolConfiguration.java b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/GlobalThreadPoolConfiguration.java index d40db01..89fbf86 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/GlobalThreadPoolConfiguration.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/GlobalThreadPoolConfiguration.java @@ -18,7 +18,7 @@ public class GlobalThreadPoolConfiguration { @Value("${thread.pool.core-pool-size: 10}") private int corePoolSize; - @Value("${thread.pool.maximum-pool-size: 20}") + @Value("${thread.pool.maximum-pool-size: 30}") private int maximumPoolSize; @Value("${thread.pool.keep-alive-time: 60}") diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKControlService.java b/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKControlService.java index 8c8f38d..1473203 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKControlService.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKControlService.java @@ -2,7 +2,6 @@ package org.dromara.sample.control.service.impl; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.manage.model.dto.DeviceDTO; import org.dromara.sample.manage.model.enums.UserTypeEnum; import org.dromara.sample.manage.service.IDeviceRedisService; @@ -14,6 +13,7 @@ import org.dromara.common.sdk.mqtt.MqttReply; import org.dromara.common.sdk.mqtt.events.TopicEventsRequest; import org.dromara.common.sdk.mqtt.events.TopicEventsResponse; import org.dromara.sample.control.model.dto.ResultNotifyDTO; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -29,6 +29,8 @@ import java.util.Optional; @Slf4j public class SDKControlService extends AbstractControlService { + @Autowired + private IWebSocketMessageService webSocketMessageService; @Autowired private IDeviceRedisService deviceRedisService; @@ -47,7 +49,7 @@ public class SDKControlService extends AbstractControlService { } FlyToPointProgress eventsReceiver = request.getData(); - WebSocketUtils.publishAll( UserTypeEnum.WEB.getVal(), + webSocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), BizCodeEnum.FLY_TO_POINT_PROGRESS.getCode(), ResultNotifyDTO.builder().sn(dockSn) .message(eventsReceiver.getResult().toString()) @@ -67,12 +69,12 @@ public class SDKControlService extends AbstractControlService { } TakeoffToPointProgress eventsReceiver = request.getData(); - WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), - BizCodeEnum.TAKE_OFF_TO_POINT_PROGRESS.getCode(), - ResultNotifyDTO.builder().sn(dockSn) - .message(eventsReceiver.getResult().toString()) - .result(eventsReceiver.getResult().getCode()) - .build()); + webSocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), + BizCodeEnum.TAKE_OFF_TO_POINT_PROGRESS.getCode(), + ResultNotifyDTO.builder().sn(dockSn) + .message(eventsReceiver.getResult().toString()) + .result(eventsReceiver.getResult().getCode()) + .build()); return new TopicEventsResponse().setData(MqttReply.success()); } @@ -88,11 +90,11 @@ public class SDKControlService extends AbstractControlService { DrcStatusNotify eventsReceiver = request.getData(); if (DrcStatusErrorEnum.SUCCESS != eventsReceiver.getResult()) { - WebSocketUtils.publishAll( - UserTypeEnum.WEB.getVal(), BizCodeEnum.DRC_STATUS_NOTIFY.getCode(), - ResultNotifyDTO.builder().sn(dockSn) - .message(eventsReceiver.getResult().getMessage()) - .result(eventsReceiver.getResult().getCode()).build()); + webSocketMessageService.sendBatch( + deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), BizCodeEnum.DRC_STATUS_NOTIFY.getCode(), + ResultNotifyDTO.builder().sn(dockSn) + .message(eventsReceiver.getResult().getMessage()) + .result(eventsReceiver.getResult().getCode()).build()); } return new TopicEventsResponse().setData(MqttReply.success()); } @@ -107,11 +109,11 @@ public class SDKControlService extends AbstractControlService { } JoystickInvalidNotify eventsReceiver = request.getData(); - WebSocketUtils.publishAll( - UserTypeEnum.WEB.getVal(), BizCodeEnum.JOYSTICK_INVALID_NOTIFY.getCode(), - ResultNotifyDTO.builder().sn(dockSn) - .message(eventsReceiver.getReason().getMessage()) - .result(eventsReceiver.getReason().getVal()).build()); + webSocketMessageService.sendBatch( + deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), BizCodeEnum.JOYSTICK_INVALID_NOTIFY.getCode(), + ResultNotifyDTO.builder().sn(dockSn) + .message(eventsReceiver.getReason().getMessage()) + .result(eventsReceiver.getReason().getVal()).build()); return new TopicEventsResponse().setData(MqttReply.success()); } } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKRemoteDebug.java b/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKRemoteDebug.java index d173628..ee4f8d4 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKRemoteDebug.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/control/service/impl/SDKRemoteDebug.java @@ -1,6 +1,5 @@ package org.dromara.sample.control.service.impl; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.manage.model.dto.DeviceDTO; import org.dromara.sample.manage.model.enums.UserTypeEnum; import org.dromara.sample.manage.service.IDeviceRedisService; @@ -12,6 +11,7 @@ import org.dromara.common.sdk.mqtt.events.EventsDataRequest; import org.dromara.common.sdk.mqtt.events.TopicEventsRequest; import org.dromara.common.sdk.mqtt.events.TopicEventsResponse; import org.dromara.sample.component.mqtt.model.EventsReceiver; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -27,6 +27,8 @@ import java.util.Optional; @Slf4j public class SDKRemoteDebug extends AbstractDebugService { + @Autowired + private IWebSocketMessageService webSocketMessageService; @Autowired private IDeviceRedisService deviceRedisService; @@ -53,7 +55,7 @@ public class SDKRemoteDebug extends AbstractDebugService { } DeviceDTO device = deviceOpt.get(); - WebSocketUtils.publishAll( UserTypeEnum.WEB.getVal(), + webSocketMessageService.sendBatch(device.getWorkspaceId(),UserTypeEnum.WEB.getVal(), request.getMethod(), eventsReceiver); return new TopicEventsResponse().setData(MqttReply.success()); diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceHmsServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceHmsServiceImpl.java index 924bd64..a860e35 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceHmsServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceHmsServiceImpl.java @@ -12,7 +12,6 @@ import org.dromara.common.sdk.common.Pagination; import org.dromara.common.sdk.common.PaginationData; import org.dromara.common.sdk.mqtt.events.TopicEventsRequest; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.manage.mapper.IDeviceHmsMapper; import org.dromara.sample.manage.model.common.HmsJsonUtil; import org.dromara.sample.manage.model.common.HmsMessage; @@ -26,6 +25,7 @@ import org.dromara.sample.manage.service.IDeviceHmsService; import org.dromara.sample.manage.service.IDeviceRedisService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; @@ -60,6 +60,9 @@ public class DeviceHmsServiceImpl extends AbstractHmsService implements IDeviceH @Autowired private IDeviceRedisService deviceRedisService; + @Autowired + private IWebSocketMessageService sendMessageService; + private static final Pattern PATTERN_KEY = Pattern.compile( "(" + Arrays.stream(HmsFormatKeyEnum.values()) @@ -102,7 +105,7 @@ public class DeviceHmsServiceImpl extends AbstractHmsService implements IDeviceH if (deviceOpt.isEmpty()) { return; } - WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), + sendMessageService.sendBatch(deviceOpt.get().getWorkspaceId(),UserTypeEnum.WEB.getVal(), BizCodeEnum.DEVICE_HMS.getCode(), TelemetryDTO.>builder().sn(sn).host(unReadList).build()); } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceLogsServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceLogsServiceImpl.java index ebcef21..4df70a8 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceLogsServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceLogsServiceImpl.java @@ -19,7 +19,6 @@ import org.dromara.common.sdk.mqtt.events.TopicEventsResponse; import org.dromara.common.sdk.mqtt.services.ServicesReplyData; import org.dromara.common.sdk.mqtt.services.TopicServicesResponse; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.component.mqtt.model.EventsReceiver; import org.dromara.sample.manage.mapper.IDeviceLogsMapper; @@ -36,6 +35,7 @@ import org.dromara.sample.manage.service.ITopologyService; import org.dromara.sample.storage.service.IStorageService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -83,6 +83,9 @@ public class DeviceLogsServiceImpl extends AbstractLogService implements IDevice @Autowired private AbstractLogService abstractLogService; + @Autowired + private IWebSocketMessageService webSocketMessageService; + @Override public PaginationData getUploadedLogs(String deviceSn, DeviceLogsQueryParam param) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() @@ -265,7 +268,7 @@ public class DeviceLogsServiceImpl extends AbstractLogService implements IDevice RedisOpsUtils.del(key); } - WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), + webSocketMessageService.sendBatch(device.getWorkspaceId(),UserTypeEnum.WEB.getVal(), BizCodeEnum.FILE_UPLOAD_PROGRESS.getCode(), webSocketData); return new TopicEventsResponse().setData(MqttReply.success()); diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java index d974a41..184d09c 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java @@ -17,7 +17,6 @@ import org.dromara.common.sdk.mqtt.requests.TopicRequestsResponse; import org.dromara.common.sdk.mqtt.services.ServicesReplyData; import org.dromara.common.sdk.mqtt.services.TopicServicesResponse; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.common.error.CommonErrorEnum; import org.dromara.sample.manage.model.dto.DeviceDTO; import org.dromara.sample.manage.model.dto.TelemetryDTO; @@ -30,6 +29,7 @@ import org.dromara.sample.map.model.param.PutFlightAreaParam; import org.dromara.sample.map.service.*; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -70,7 +70,8 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements @Autowired private IDeviceFlightAreaService deviceFlightAreaService; - + @Autowired + private IWebSocketMessageService webSocketMessageService; @Autowired private ObjectMapper objectMapper; @@ -139,7 +140,7 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements } flightAreaFileService.setNonLatestByWorkspaceId(workspaceId); - WebSocketUtils.publishAll( BizCodeEnum.FLIGHT_AREAS_UPDATE.getCode(), + webSocketMessageService.sendBatch(workspaceId, BizCodeEnum.FLIGHT_AREAS_UPDATE.getCode(), FlightAreaWs.builder() .operation(FlightAreaOpertaionEnum.ADD) .areaId(param.getId()) @@ -188,7 +189,7 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements throw new RuntimeException("无法删除飞行区域属性。"); } flightAreaFileService.setNonLatestByWorkspaceId(workspaceId); - WebSocketUtils.publishAll( BizCodeEnum.FLIGHT_AREAS_UPDATE.getCode(), + webSocketMessageService.sendBatch(workspaceId,BizCodeEnum.FLIGHT_AREAS_UPDATE.getCode(), FlightAreaWs.builder() .operation(FlightAreaOpertaionEnum.DELETE) .areaId(areaId) @@ -225,7 +226,7 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements } flightAreaFileService.setNonLatestByWorkspaceId(workspaceId); Optional areaOpt = getFlightAreaByAreaId(areaId); - areaOpt.ifPresent(area -> WebSocketUtils.publishAll( + areaOpt.ifPresent(area -> webSocketMessageService.sendBatch(workspaceId, BizCodeEnum.FLIGHT_AREAS_UPDATE.getCode(), FlightAreaWs.builder() .operation(FlightAreaOpertaionEnum.UPDATE) @@ -259,7 +260,7 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements .syncCode(data.getReason()) .build(); deviceFlightAreaService.updateOrSaveDeviceFile(deviceFlightArea); - WebSocketUtils.publishAll( BizCodeEnum.FLIGHT_AREAS_SYNC_PROGRESS.getCode(), + webSocketMessageService.sendBatch(workspaceId, BizCodeEnum.FLIGHT_AREAS_SYNC_PROGRESS.getCode(), FlightAreaNotifyDTO.builder() .sn(request.getGateway()) .result(data.getReason().getReason()) @@ -279,7 +280,7 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements if (request.getData().getDroneLocations().isEmpty()) { return new TopicEventsResponse<>(); } - WebSocketUtils.publishAll( BizCodeEnum.FLIGHT_AREAS_DRONE_LOCATION.getCode(), + webSocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(),BizCodeEnum.FLIGHT_AREAS_DRONE_LOCATION.getCode(), TelemetryDTO.builder().sn(deviceOpt.get().getChildDeviceSn()).host(request.getData()).build()); return new TopicEventsResponse<>(); } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/WorkspaceElementServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/WorkspaceElementServiceImpl.java index 3529f9e..dd94c70 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/WorkspaceElementServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/WorkspaceElementServiceImpl.java @@ -3,12 +3,12 @@ package org.dromara.sample.map.service.impl; import org.dromara.common.sdk.cloudapi.map.*; import org.dromara.common.sdk.common.HttpResultResponse; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.map.model.dto.GroupElementDTO; import org.dromara.sample.map.service.IElementCoordinateService; import org.dromara.sample.map.service.IGroupElementService; import org.dromara.sample.map.service.IGroupService; import org.dromara.sample.map.service.IWorkspaceElementService; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -34,6 +34,8 @@ public class WorkspaceElementServiceImpl implements IWorkspaceElementService { @Autowired private IElementCoordinateService elementCoordinateService; + @Autowired + private IWebSocketMessageService webSocketMessageService; @Override public List getAllGroupsByWorkspaceId(String workspaceId, String groupId, Boolean isDistributed) { @@ -51,8 +53,8 @@ public class WorkspaceElementServiceImpl implements IWorkspaceElementService { if (notify) { // Notify all WebSocket connections in this workspace to be updated when an element is created. getElementByElementId(elementCreate.getId()) - .ifPresent(groupElement -> WebSocketUtils.publishAll( - BizCodeEnum.MAP_ELEMENT_CREATE.getCode(), + .ifPresent(groupElement -> webSocketMessageService.sendBatch( + workspaceId,BizCodeEnum.MAP_ELEMENT_CREATE.getCode(), element2CreateWsElement(groupElement))); } return HttpResultResponse.success(); @@ -68,8 +70,8 @@ public class WorkspaceElementServiceImpl implements IWorkspaceElementService { if (notify) { // Notify all WebSocket connections in this workspace to update when there is an element update. getElementByElementId(elementId) - .ifPresent(groupElement -> WebSocketUtils.publishAll( - BizCodeEnum.MAP_ELEMENT_UPDATE.getCode(), + .ifPresent(groupElement -> webSocketMessageService.sendBatch( + workspaceId,BizCodeEnum.MAP_ELEMENT_UPDATE.getCode(), element2UpdateWsElement(groupElement))); } return HttpResultResponse.success(); @@ -92,7 +94,8 @@ public class WorkspaceElementServiceImpl implements IWorkspaceElementService { if (notify) { // Notify all WebSocket connections in this workspace to update when an element is deleted. elementOpt.ifPresent(element -> - WebSocketUtils.publishAll( BizCodeEnum.MAP_ELEMENT_DELETE.getCode(), + webSocketMessageService.sendBatch( + workspaceId,BizCodeEnum.MAP_ELEMENT_DELETE.getCode(), new MapElementDeleteWsResponse() .setGroupId(element.getGroupId()) .setId(elementId))); diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/media/service/impl/MediaServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/media/service/impl/MediaServiceImpl.java index 81880ee..2b25936 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/media/service/impl/MediaServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/media/service/impl/MediaServiceImpl.java @@ -6,7 +6,6 @@ import org.dromara.common.sdk.mqtt.MqttReply; import org.dromara.common.sdk.mqtt.events.TopicEventsRequest; import org.dromara.common.sdk.mqtt.events.TopicEventsResponse; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.manage.model.dto.DeviceDTO; import org.dromara.sample.manage.model.enums.UserTypeEnum; import org.dromara.sample.manage.service.IDeviceRedisService; @@ -19,6 +18,7 @@ import org.dromara.sample.media.service.IMediaService; import org.dromara.sample.wayline.service.IWaylineJobService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -57,6 +57,9 @@ public class MediaServiceImpl extends AbstractMediaService implements IMediaServ @Autowired private IMediaRedisService mediaRedisService; + @Autowired + private IWebSocketMessageService webSocketMessageService; + @Override public Boolean fastUpload(String workspaceId, String fingerprint) { return fileService.checkExist(workspaceId, fingerprint); @@ -140,7 +143,7 @@ public class MediaServiceImpl extends AbstractMediaService implements IMediaServ return null; } - WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), + webSocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(),UserTypeEnum.WEB.getVal(), BizCodeEnum.HIGHEST_PRIORITY_UPLOAD_FLIGHT_TASK_MEDIA.getCode(), countDTO); return new TopicEventsResponse().setData(MqttReply.success()); @@ -183,7 +186,7 @@ public class MediaServiceImpl extends AbstractMediaService implements IMediaServ mediaRedisService.setMediaCount(request.getGateway(), jobId, mediaFileCount); } - WebSocketUtils.publishAll( UserTypeEnum.WEB.getVal(), + webSocketMessageService.sendBatch(dock.getWorkspaceId(), UserTypeEnum.WEB.getVal(), BizCodeEnum.FILE_UPLOAD_CALLBACK.getCode(), mediaFileCount); } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/SDKWaylineService.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/SDKWaylineService.java index 62d07d1..aefb7ed 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/SDKWaylineService.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/SDKWaylineService.java @@ -9,7 +9,6 @@ import org.dromara.common.sdk.mqtt.events.TopicEventsResponse; import org.dromara.common.sdk.mqtt.requests.TopicRequestsRequest; import org.dromara.common.sdk.mqtt.requests.TopicRequestsResponse; import org.dromara.common.websocket.dto.BizCodeEnum; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.sample.common.error.CommonErrorEnum; import org.dromara.sample.component.mqtt.model.EventsReceiver; import org.dromara.sample.manage.model.dto.DeviceDTO; @@ -23,6 +22,7 @@ import org.dromara.sample.wayline.service.IWaylineFileService; import org.dromara.sample.wayline.service.IWaylineJobService; import org.dromara.sample.wayline.service.IWaylineRedisService; import lombok.extern.slf4j.Slf4j; +import org.dromara.sample.websocket.service.IWebSocketMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -59,6 +59,10 @@ public class SDKWaylineService extends AbstractWaylineService { @Autowired private IWaylineFileService waylineFileService; + + @Autowired + private IWebSocketMessageService webSocketMessageService; + @Override public TopicEventsResponse deviceExitHomingNotify(TopicEventsRequest request, MessageHeaders headers) { return super.deviceExitHomingNotify(request, headers); @@ -109,7 +113,7 @@ public class SDKWaylineService extends AbstractWaylineService { waylineRedisService.delRunningWaylineJob(response.getGateway()); waylineRedisService.delPausedWaylineJob(response.getBid()); } - WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), + webSocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(),UserTypeEnum.WEB.getVal(), BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver); return new TopicEventsResponse<>(); diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineFileServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineFileServiceImpl.java index 0ed105b..d228af7 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineFileServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineFileServiceImpl.java @@ -198,7 +198,6 @@ public class WaylineFileServiceImpl implements IWaylineFileService { OssClient storage = OssFactory.instance("waylinefile"); String originalfileName = file.getOriginalFilename(); String suffix = org.apache.commons.lang3.StringUtils.substring(originalfileName, originalfileName.lastIndexOf("."), originalfileName.length()); - suffix = "waylinefile/"+suffix; UploadResult uploadResult = storage.uploadSuffix(file.getBytes(), suffix, file.getContentType()); waylineFile.setObjectKey(uploadResult.getFilename()); List deviceSns = new ArrayList<>(); diff --git a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/MyConcurrentWebSocketSession.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyConcurrentWebSocketSession.java similarity index 94% rename from dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/MyConcurrentWebSocketSession.java rename to dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyConcurrentWebSocketSession.java index 03a40bd..1b07002 100644 --- a/dk-common/common-websocket/src/main/java/org/dromara/common/websocket/config/MyConcurrentWebSocketSession.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyConcurrentWebSocketSession.java @@ -1,4 +1,4 @@ -package org.dromara.common.websocket.config; +package org.dromara.sample.websocket.config; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketFactory.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketFactory.java new file mode 100644 index 0000000..23a04ab --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketFactory.java @@ -0,0 +1,27 @@ +package org.dromara.sample.websocket.config; + +import org.dromara.common.websocket.WebSocketDefaultFactory; +import org.dromara.sample.websocket.service.IWebSocketManageService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketHandler; + +/** + * + * @author sean.zhou + * @date 2021/11/16 + * @version 0.1 + */ +@Component +@Primary +public class MyWebSocketFactory extends WebSocketDefaultFactory { + + @Autowired + private IWebSocketManageService webSocketManageService; + + @Override + public WebSocketHandler decorate(WebSocketHandler handler) { + return new MyWebSocketHandler(handler, webSocketManageService); + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketHandler.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketHandler.java new file mode 100644 index 0000000..35f1e38 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketHandler.java @@ -0,0 +1,58 @@ +package org.dromara.sample.websocket.config; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.websocket.WebSocketDefaultHandler; +import org.dromara.sample.websocket.service.IWebSocketManageService; +import org.springframework.util.StringUtils; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.security.Principal; + +/** + * + * @author sean.zhou + * @date 2021/11/16 + * @version 0.1 + */ +@Slf4j +public class MyWebSocketHandler extends WebSocketDefaultHandler { + + private IWebSocketManageService webSocketManageService; + + MyWebSocketHandler(WebSocketHandler delegate, IWebSocketManageService webSocketManageService) { + super(delegate); + this.webSocketManageService = webSocketManageService; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + Principal principal = session.getPrincipal(); + if (StringUtils.hasText(principal.getName())) { + webSocketManageService.put(principal.getName(), new MyConcurrentWebSocketSession(session)); + log.debug("{} is connected. ID: {}. WebSocketSession[current count: {}]", + principal.getName(), session.getId(), webSocketManageService.getConnectedCount()); + return; + } + session.close(); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + Principal principal = session.getPrincipal(); + if (StringUtils.hasText(principal.getName())) { + webSocketManageService.remove(principal.getName(), session.getId()); + log.debug("{} is disconnected. ID: {}. WebSocketSession[current count: {}]", + principal.getName(), session.getId(), webSocketManageService.getConnectedCount()); + } + + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + log.debug("received message: {}", message.getPayload()); + } + +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/model/BizCodeEnum.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/model/BizCodeEnum.java new file mode 100644 index 0000000..cb39f28 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/model/BizCodeEnum.java @@ -0,0 +1,93 @@ +package org.dromara.sample.websocket.model; + +/** + * @author sean + * @version 0.1 + * @date 2021/11/26 + */ +public enum BizCodeEnum { + + DEVICE_ONLINE("device_online"), + + DEVICE_OFFLINE("device_offline"), + + DEVICE_UPDATE_TOPO("device_update_topo"), + + DEVICE_OSD("device_osd"), + + RC_OSD("gateway_osd"), + + DOCK_OSD("dock_osd"), + + MAP_ELEMENT_CREATE("map_element_create"), + + MAP_ELEMENT_UPDATE("map_element_update"), + + MAP_ELEMENT_DELETE("map_element_delete"), + + MAP_GROUP_REFRESH("map_group_refresh"), + + FLIGHT_TASK_PROGRESS("flighttask_progress"), + + DEVICE_HMS("device_hms"), + + DEVICE_REBOOT("device_reboot"), + + DRONE_OPEN("drone_open"), + + DRONE_CLOSE("drone_close"), + + DEVICE_CHECK("device_check"), + + DRONE_FORMAT("drone_format"), + + DEVICE_FORMAT("device_format"), + + COVER_OPEN("cover_open"), + + COVER_CLOSE("cover_close"), + + PUTTER_OPEN("putter_open"), + + PUTTER_CLOSE("putter_close"), + + CHARGE_OPEN("charge_open"), + + CHARGE_CLOSE("charge_close"), + + FILE_UPLOAD_CALLBACK("file_upload_callback"), + + FILE_UPLOAD_PROGRESS("fileupload_progress"), + + OTA_PROGRESS("ota_progress"), + + HIGHEST_PRIORITY_UPLOAD_FLIGHT_TASK_MEDIA("highest_priority_upload_flighttask_media"), + + CONTROL_SOURCE_CHANGE("control_source_change"), + + FLY_TO_POINT_PROGRESS("fly_to_point_progress"), + + TAKE_OFF_TO_POINT_PROGRESS("takeoff_to_point_progress"), + + DRC_STATUS_NOTIFY("drc_status_notify"), + + JOYSTICK_INVALID_NOTIFY("joystick_invalid_notify"), + + FLIGHT_AREAS_SYNC_PROGRESS("flight_areas_sync_progress"), + + FLIGHT_AREAS_DRONE_LOCATION("flight_areas_drone_location"), + + FLIGHT_AREAS_UPDATE("flight_areas_update"), + + ; + + private String code; + + BizCodeEnum(String code) { + this.code = code; + } + + public String getCode() { + return code; + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketManageService.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketManageService.java new file mode 100644 index 0000000..5030c4a --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketManageService.java @@ -0,0 +1,26 @@ +package org.dromara.sample.websocket.service; + + + +import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; + +import java.util.Collection; + +/** + * @author sean + * @version 1.0 + * @date 2022/4/25 + */ +public interface IWebSocketManageService { + + void put(String key, MyConcurrentWebSocketSession val); + + void remove(String key, String sessionId); + + Collection getValueWithWorkspace(String workspaceId); + + Collection getValueWithWorkspaceAndUserType(String workspaceId, Integer userType); + + Long getConnectedCount(); + +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketMessageService.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketMessageService.java new file mode 100644 index 0000000..6811db1 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/IWebSocketMessageService.java @@ -0,0 +1,33 @@ +package org.dromara.sample.websocket.service; + + +import org.dromara.common.websocket.dto.WebSocketMessageResponse; +import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; + +import java.util.Collection; + +/** + * @author sean.zhou + * @date 2021/11/24 + * @version 0.1 + */ +public interface IWebSocketMessageService { + + /** + * Send a message to the specific connection. + * @param session A WebSocket connection object + * @param message message + */ + void sendMessage(MyConcurrentWebSocketSession session, WebSocketMessageResponse message); + + /** + * Send the same message to specific connection. + * @param sessions A collection of WebSocket connection objects. + * @param message message + */ + void sendBatch(Collection sessions, WebSocketMessageResponse message); + + void sendBatch(String workspaceId, Integer userType, String bizCode, Object data); + + void sendBatch(String workspaceId, String bizCode, Object data); +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketManageServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketManageServiceImpl.java new file mode 100644 index 0000000..17bfaaa --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketManageServiceImpl.java @@ -0,0 +1,86 @@ +package org.dromara.sample.websocket.service.impl; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.redis.config.RedisConst; +import org.dromara.common.redis.utils.RedisOpsUtils; +import org.dromara.sample.manage.model.enums.UserTypeEnum; +import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; +import org.dromara.sample.websocket.service.IWebSocketManageService; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author sean + * @version 1.0 + * @date 2022/4/25 + */ +@Slf4j +@Service +public class WebSocketManageServiceImpl implements IWebSocketManageService { + + private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>(16); + + @Override + public void put(String key, MyConcurrentWebSocketSession val) { + String[] name = key.split("/"); + if (name.length != 3) { + log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); + return; + } + String sessionId = val.getId(); + String workspaceKey = RedisConst.WEBSOCKET_PREFIX + name[0]; + String userTypeKey = RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(Integer.parseInt(name[1])).getDesc(); + RedisOpsUtils.hashSet(workspaceKey, sessionId, name[2]); + RedisOpsUtils.hashSet(userTypeKey, sessionId, name[2]); + SESSIONS.put(sessionId, val); + RedisOpsUtils.expireKey(workspaceKey, RedisConst.WEBSOCKET_ALIVE_SECOND); + RedisOpsUtils.expireKey(userTypeKey, RedisConst.WEBSOCKET_ALIVE_SECOND); + } + + @Override + public void remove(String key, String sessionId) { + String[] name = key.split("/"); + if (name.length != 3) { + log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); + return; + } + RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX + name[0], new String[] {sessionId}); + RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(Integer.parseInt(name[1])).getDesc(), new String[] {sessionId}); + SESSIONS.remove(sessionId); + } + + @Override + public Collection getValueWithWorkspace(String workspaceId) { + if (!StringUtils.hasText(workspaceId)) { + return Collections.emptySet(); + } + String key = RedisConst.WEBSOCKET_PREFIX + workspaceId; + + return RedisOpsUtils.hashKeys(key) + .stream() + .map(SESSIONS::get) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + @Override + public Collection getValueWithWorkspaceAndUserType(String workspaceId, Integer userType) { + String key = RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(userType).getDesc(); + return RedisOpsUtils.hashKeys(key) + .stream() + .map(SESSIONS::get) + .filter(getValueWithWorkspace(workspaceId)::contains) + .collect(Collectors.toSet()); + } + + @Override + public Long getConnectedCount() { + return SESSIONS.mappingCount(); + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketMessageServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketMessageServiceImpl.java new file mode 100644 index 0000000..437d579 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketMessageServiceImpl.java @@ -0,0 +1,99 @@ +package org.dromara.sample.websocket.service.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.websocket.dto.WebSocketMessageResponse; +import org.dromara.sample.websocket.config.MyConcurrentWebSocketSession; +import org.dromara.sample.websocket.service.IWebSocketManageService; +import org.dromara.sample.websocket.service.IWebSocketMessageService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import org.springframework.web.socket.TextMessage; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +/** + * @author sean.zhou + * @version 0.1 + * @date 2021/11/24 + */ +@Service +@Slf4j +public class WebSocketMessageServiceImpl implements IWebSocketMessageService { + + @Autowired + private ObjectMapper mapper; + + @Autowired + private IWebSocketManageService webSocketManageService; + + @Override + public void sendMessage(MyConcurrentWebSocketSession session, WebSocketMessageResponse message) { + if (session == null) { + return; + } + + try { + if (!session.isOpen()) { + session.close(); + log.debug("This session is closed."); + return; + } + + + session.sendMessage(new TextMessage(mapper.writeValueAsBytes(message))); + } catch (IOException e) { + log.info("Failed to publish the message. {}", message.toString()); + e.printStackTrace(); + } + } + + @Override + public void sendBatch(Collection sessions, WebSocketMessageResponse message) { + if (sessions.isEmpty()) { + return; + } + + try { + + TextMessage data = new TextMessage(mapper.writeValueAsBytes(message)); + + for (MyConcurrentWebSocketSession session : sessions) { + if (!session.isOpen()) { + session.close(); + log.debug("This session is closed."); + return; + } + session.sendMessage(data); + } + + } catch (IOException e) { + log.info("Failed to publish the message. {}", message.toString()); + + e.printStackTrace(); + } + } + + @Override + public void sendBatch(String workspaceId, Integer userType, String bizCode, Object data) { + if (!StringUtils.hasText(workspaceId)) { + throw new RuntimeException("工作区ID不存在。"); + } + Collection sessions = Objects.isNull(userType) ? + webSocketManageService.getValueWithWorkspace(workspaceId) : + webSocketManageService.getValueWithWorkspaceAndUserType(workspaceId, userType); + + this.sendBatch(sessions, new WebSocketMessageResponse() + .setData(Objects.requireNonNullElse(data, "")) + .setTimestamp(System.currentTimeMillis()) + .setBizCode(bizCode)); + } + + @Override + public void sendBatch(String workspaceId, String bizCode, Object data) { + this.sendBatch(workspaceId, null, bizCode, data); + } +} diff --git a/pom.xml b/pom.xml index 7555b9f..eca3ddd 100644 --- a/pom.xml +++ b/pom.xml @@ -82,10 +82,10 @@ - dev + wuyuan - dev + wuyuan 127.0.0.1:8848 DEFAULT_GROUP DEFAULT_GROUP