From a9a5c80c72a9099a187bf9b8265bfa899075477f Mon Sep 17 00:00:00 2001 From: yq183 <645046984@qq.com> Date: Tue, 14 Jan 2025 17:38:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yq/web/controller/nologin/NoLogin.java | 10 +- yq-common/pom.xml | 6 +- .../common/webSocket/MyWebSocketHandler.java | 136 ++++++++++++++++++ .../webSocket/MyWebSocketInterceptor.java | 5 +- .../webSocket/SpringWebSocketConfig.java | 2 +- yq-framework/pom.xml | 6 +- .../web/webSocket/MyWebSocketHandler.java | 48 ------- 7 files changed, 154 insertions(+), 59 deletions(-) create mode 100644 yq-common/src/main/java/yq/common/webSocket/MyWebSocketHandler.java rename {yq-framework/src/main/java/yq/framework/web => yq-common/src/main/java/yq/common}/webSocket/MyWebSocketInterceptor.java (94%) rename {yq-framework/src/main/java/yq/framework/web => yq-common/src/main/java/yq/common}/webSocket/SpringWebSocketConfig.java (96%) delete mode 100644 yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketHandler.java diff --git a/yq-admin/src/main/java/yq/web/controller/nologin/NoLogin.java b/yq-admin/src/main/java/yq/web/controller/nologin/NoLogin.java index a161870..a5ed61c 100644 --- a/yq-admin/src/main/java/yq/web/controller/nologin/NoLogin.java +++ b/yq-admin/src/main/java/yq/web/controller/nologin/NoLogin.java @@ -17,6 +17,7 @@ import yq.common.core.domain.entity.SysDevice; import yq.common.core.domain.entity.SysTicket; import yq.common.core.domain.vo.SysDeviceVo; +import yq.common.core.redis.RedisCache; import yq.common.enums.BusinessType; import yq.common.exception.ServiceException; import yq.common.utils.DateUtils; @@ -57,6 +58,8 @@ public class NoLogin { private ISysTicketService ticketService; @Autowired private IMineWarningService mineWarningService; + @Autowired + private RedisCache redisCache; @Autowired @@ -92,7 +95,12 @@ public class NoLogin { return AjaxResult.success(mineWarningService.insertMineWarning(mineWarning)); } - + //获取 + @GetMapping("/temp") + public MaptempMap(String id) + { + return redisCache.getCacheMap(id); + } //通过萤石云进行取流 @GetMapping("/list") public List>list(Integer deviceState, Long depeId) diff --git a/yq-common/pom.xml b/yq-common/pom.xml index 16cc9c4..27d37dd 100644 --- a/yq-common/pom.xml +++ b/yq-common/pom.xml @@ -28,7 +28,11 @@ org.springframework spring-web - + + + org.springframework.boot + spring-boot-starter-websocket + org.springframework.boot diff --git a/yq-common/src/main/java/yq/common/webSocket/MyWebSocketHandler.java b/yq-common/src/main/java/yq/common/webSocket/MyWebSocketHandler.java new file mode 100644 index 0000000..e8022ad --- /dev/null +++ b/yq-common/src/main/java/yq/common/webSocket/MyWebSocketHandler.java @@ -0,0 +1,136 @@ +package yq.common.webSocket; + +import cn.hutool.core.convert.Convert; + +import com.alibaba.fastjson2.JSON; + +import org.springframework.stereotype.Component; +import org.springframework.web.socket.*; +import yq.common.core.redis.RedisCache; + +import yq.common.utils.spring.SpringUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @auther yq + * @data 2025/1/13 + */ +@Component +public class MyWebSocketHandler implements WebSocketHandler { + + + private RedisCache redisCache=(RedisCache)SpringUtils.getBean(RedisCache.class); + // 存储所有连接的会话 + private Map sessionMap = new ConcurrentHashMap<>(); + + // 记录每个会话的重连次数 + private Map reconnectAttempts = new ConcurrentHashMap<>(); + private static final int MAX_RECONNECT_ATTEMPTS = 5; + + // WebSocket连接建立后回调的方法 + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + System.out.println("客户端(" + session.getAttributes().get("id") + ")上线连接"); + // 当连接建立时,将会话添加到会话映射中 + sessionMap.put(session.getId(), session); + reconnectAttempts.put(session.getId(), 0); // 初始化重连次数 + System.out.println("新的连接:" + session.getId()); + System.out.println("当前在线人数:" + sessionMap.size()); + } + + // 接收客户端发送的消息 + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + // 连接发送的消息 + String msg = message.getPayload().toString(); + String id = Convert.toStr(session.getAttributes().get("id")); + System.out.println("客户端(" + id + ")消息:" + msg); + + // // 通过session向连接发送消息 + // session.sendMessage(new TextMessage("我收到了你的消息,感谢你的来信")); + if(id.contains("dk")){ + Map object = JSON.toJavaObject(msg, Map.class); + if(redisCache.hasKey(id)){ + Double temp = Convert.toDouble(object.get("temp")); + Map cacheMap = redisCache.getCacheMap(id); + Double temp1 = Convert.toDouble(cacheMap.get("temp")); + if(temp>temp1){ + redisCache.setCacheMap(id,object); + } + }else { + redisCache.setCacheMap(id,object); + } + + } + } + + // 向特定客户端发送消息 + public void sendMessageToClient(String sessionId, String message) throws Exception { + WebSocketSession session = sessionMap.get(sessionId); + if (session != null && session.isOpen()) { + session.sendMessage(new TextMessage(message)); + System.out.println("已发送消息到客户端 " + sessionId); + } else { + System.out.println("客户端连接不存在或已关闭"); + } + } + + // 连接出错时,回调的方法 + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + System.out.println("收到的错误信息:" + exception); + // 处理重连逻辑 + String sessionId = session.getId(); + int attempts = reconnectAttempts.getOrDefault(sessionId, 0); + + if (attempts < MAX_RECONNECT_ATTEMPTS) { + // 重连次数加1 + reconnectAttempts.put(sessionId, attempts + 1); + System.out.println("尝试重连客户端:" + sessionId + ",已尝试 " + (attempts + 1) + " 次"); + + // 可以在这里调用某些方法来尝试重新建立 WebSocket 连接 + // 如需要,使用 ScheduledExecutorService 等工具来定时重连 + + TimeUnit.SECONDS.sleep(2); // 假设每次重连等待2秒 + // 重连逻辑可以实现,但这里依赖外部的 WebSocket 客户端来支持重连 + + } else { + System.out.println("客户端:" + sessionId + " 达到最大重连次数,停止重连"); + reconnectAttempts.remove(sessionId); + } + } + + // 连接关闭时,回调的方法 + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + System.out.println("客户端(" + session.getAttributes().get("id") + ")断开连接"); + + // 在连接关闭时移除session并统计 + sessionMap.remove(session.getId()); + reconnectAttempts.remove(session.getId()); + + System.out.println("当前在线人数:" + getOnlineCount()); + + // 尝试重连 + // 可以加入重连的业务逻辑 + // 不过这里的实现假设是客户端主动处理断开和重连 + } + + // 获取当前在线人数 + public int getOnlineCount() { + return sessionMap.size(); + } + + // WebSocketHandler 是否处理部分消息 默认返回false即可 + @Override + public boolean supportsPartialMessages() { + return false; + } + + public RedisCache getRedisCache() { + return redisCache; + } +} diff --git a/yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketInterceptor.java b/yq-common/src/main/java/yq/common/webSocket/MyWebSocketInterceptor.java similarity index 94% rename from yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketInterceptor.java rename to yq-common/src/main/java/yq/common/webSocket/MyWebSocketInterceptor.java index 16bacce..e0fdd42 100644 --- a/yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketInterceptor.java +++ b/yq-common/src/main/java/yq/common/webSocket/MyWebSocketInterceptor.java @@ -1,4 +1,4 @@ -package yq.framework.web.webSocket; +package yq.common.webSocket; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; @@ -15,7 +15,6 @@ public class MyWebSocketInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest servletRequest, ServerHttpResponse servletResponse, WebSocketHandler wsHandler, Map attributes) throws Exception { System.out.println("websocket前置拦截"); - // //如果用到Sec-WebSocket-Protocol,可以采用getHeaders().get(key)的方法获取 // if (servletRequest.getHeaders().get("Sec-WebSocket-Protocol") == null) { // System.out.println("无Sec-WebSocket-Protocol,进行拦截!"); @@ -34,6 +33,6 @@ public class MyWebSocketInterceptor implements HandshakeInterceptor { @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - System.out.println("websocket后置拦截"); + System.out.println("websocketh后置"); } } diff --git a/yq-framework/src/main/java/yq/framework/web/webSocket/SpringWebSocketConfig.java b/yq-common/src/main/java/yq/common/webSocket/SpringWebSocketConfig.java similarity index 96% rename from yq-framework/src/main/java/yq/framework/web/webSocket/SpringWebSocketConfig.java rename to yq-common/src/main/java/yq/common/webSocket/SpringWebSocketConfig.java index ff9adca..49cb156 100644 --- a/yq-framework/src/main/java/yq/framework/web/webSocket/SpringWebSocketConfig.java +++ b/yq-common/src/main/java/yq/common/webSocket/SpringWebSocketConfig.java @@ -1,4 +1,4 @@ -package yq.framework.web.webSocket; +package yq.common.webSocket; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; diff --git a/yq-framework/pom.xml b/yq-framework/pom.xml index 8429728..a584918 100644 --- a/yq-framework/pom.xml +++ b/yq-framework/pom.xml @@ -29,11 +29,7 @@ spring-boot-starter-aop - - - org.springframework.boot - spring-boot-starter-websocket - + diff --git a/yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketHandler.java b/yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketHandler.java deleted file mode 100644 index ae1dc24..0000000 --- a/yq-framework/src/main/java/yq/framework/web/webSocket/MyWebSocketHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -package yq.framework.web.webSocket; - -import org.springframework.web.socket.*; - -/** - * @auther yq - * @data 2025/1/13 - */ -public class MyWebSocketHandler implements WebSocketHandler { - - //建立新的 socket 连接后回调的方法 - @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - System.out.println("新连接"); - //从session中获取存放的参数 - session.getAttributes().get("id"); - } - - // 接收客户端发送的 Socket - @Override - public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - // 连接发送的消息 - String msg = message.getPayload().toString(); - System.out.println("收到的消息:" + msg+"来自客户端:"+session.getId()); - // 通过session向连接发送消息 - session.sendMessage(new TextMessage("我收到了你的消息,感谢你的来信")); - } - - //连接出错时,回调的方法 - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - System.out.println("收到的错误信息:" + exception); - } - - //连接关闭时,回调的方法 - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { - System.out.println("断开连接"); - } - - - // WebSocketHandler 是否处理部分消息 默认返回false即可 - @Override - public boolean supportsPartialMessages() { - return false; - } - -}