7 changed files with 154 additions and 59 deletions
@ -0,0 +1,136 @@ |
|||
package yq.common.webSocket; |
|||
|
|||
import cn.hutool.core.convert.Convert; |
|||
|
|||
import com.alibaba.fastjson2.JSON; |
|||
|
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.web.socket.*; |
|||
import yq.common.core.redis.RedisCache; |
|||
|
|||
import yq.common.utils.spring.SpringUtils; |
|||
|
|||
import java.util.Map; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
/** |
|||
* @auther yq |
|||
* @data 2025/1/13 |
|||
*/ |
|||
@Component |
|||
public class MyWebSocketHandler implements WebSocketHandler { |
|||
|
|||
|
|||
private RedisCache redisCache=(RedisCache)SpringUtils.getBean(RedisCache.class); |
|||
// 存储所有连接的会话
|
|||
private Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>(); |
|||
|
|||
// 记录每个会话的重连次数
|
|||
private Map<String, Integer> reconnectAttempts = new ConcurrentHashMap<>(); |
|||
private static final int MAX_RECONNECT_ATTEMPTS = 5; |
|||
|
|||
// WebSocket连接建立后回调的方法
|
|||
@Override |
|||
public void afterConnectionEstablished(WebSocketSession session) throws Exception { |
|||
System.out.println("客户端(" + session.getAttributes().get("id") + ")上线连接"); |
|||
// 当连接建立时,将会话添加到会话映射中
|
|||
sessionMap.put(session.getId(), session); |
|||
reconnectAttempts.put(session.getId(), 0); // 初始化重连次数
|
|||
System.out.println("新的连接:" + session.getId()); |
|||
System.out.println("当前在线人数:" + sessionMap.size()); |
|||
} |
|||
|
|||
// 接收客户端发送的消息
|
|||
@Override |
|||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { |
|||
// 连接发送的消息
|
|||
String msg = message.getPayload().toString(); |
|||
String id = Convert.toStr(session.getAttributes().get("id")); |
|||
System.out.println("客户端(" + id + ")消息:" + msg); |
|||
|
|||
// // 通过session向连接发送消息
|
|||
// session.sendMessage(new TextMessage("我收到了你的消息,感谢你的来信"));
|
|||
if(id.contains("dk")){ |
|||
Map object = JSON.toJavaObject(msg, Map.class); |
|||
if(redisCache.hasKey(id)){ |
|||
Double temp = Convert.toDouble(object.get("temp")); |
|||
Map<String, Object> cacheMap = redisCache.getCacheMap(id); |
|||
Double temp1 = Convert.toDouble(cacheMap.get("temp")); |
|||
if(temp>temp1){ |
|||
redisCache.setCacheMap(id,object); |
|||
} |
|||
}else { |
|||
redisCache.setCacheMap(id,object); |
|||
} |
|||
|
|||
} |
|||
} |
|||
|
|||
// 向特定客户端发送消息
|
|||
public void sendMessageToClient(String sessionId, String message) throws Exception { |
|||
WebSocketSession session = sessionMap.get(sessionId); |
|||
if (session != null && session.isOpen()) { |
|||
session.sendMessage(new TextMessage(message)); |
|||
System.out.println("已发送消息到客户端 " + sessionId); |
|||
} else { |
|||
System.out.println("客户端连接不存在或已关闭"); |
|||
} |
|||
} |
|||
|
|||
// 连接出错时,回调的方法
|
|||
@Override |
|||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { |
|||
System.out.println("收到的错误信息:" + exception); |
|||
// 处理重连逻辑
|
|||
String sessionId = session.getId(); |
|||
int attempts = reconnectAttempts.getOrDefault(sessionId, 0); |
|||
|
|||
if (attempts < MAX_RECONNECT_ATTEMPTS) { |
|||
// 重连次数加1
|
|||
reconnectAttempts.put(sessionId, attempts + 1); |
|||
System.out.println("尝试重连客户端:" + sessionId + ",已尝试 " + (attempts + 1) + " 次"); |
|||
|
|||
// 可以在这里调用某些方法来尝试重新建立 WebSocket 连接
|
|||
// 如需要,使用 ScheduledExecutorService 等工具来定时重连
|
|||
|
|||
TimeUnit.SECONDS.sleep(2); // 假设每次重连等待2秒
|
|||
// 重连逻辑可以实现,但这里依赖外部的 WebSocket 客户端来支持重连
|
|||
|
|||
} else { |
|||
System.out.println("客户端:" + sessionId + " 达到最大重连次数,停止重连"); |
|||
reconnectAttempts.remove(sessionId); |
|||
} |
|||
} |
|||
|
|||
// 连接关闭时,回调的方法
|
|||
@Override |
|||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { |
|||
System.out.println("客户端(" + session.getAttributes().get("id") + ")断开连接"); |
|||
|
|||
// 在连接关闭时移除session并统计
|
|||
sessionMap.remove(session.getId()); |
|||
reconnectAttempts.remove(session.getId()); |
|||
|
|||
System.out.println("当前在线人数:" + getOnlineCount()); |
|||
|
|||
// 尝试重连
|
|||
// 可以加入重连的业务逻辑
|
|||
// 不过这里的实现假设是客户端主动处理断开和重连
|
|||
} |
|||
|
|||
// 获取当前在线人数
|
|||
public int getOnlineCount() { |
|||
return sessionMap.size(); |
|||
} |
|||
|
|||
// WebSocketHandler 是否处理部分消息 默认返回false即可
|
|||
@Override |
|||
public boolean supportsPartialMessages() { |
|||
return false; |
|||
} |
|||
|
|||
public RedisCache getRedisCache() { |
|||
return redisCache; |
|||
} |
|||
} |
@ -1,4 +1,4 @@ |
|||
package yq.framework.web.webSocket; |
|||
package yq.common.webSocket; |
|||
|
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.web.socket.config.annotation.EnableWebSocket; |
@ -1,48 +0,0 @@ |
|||
package yq.framework.web.webSocket; |
|||
|
|||
import org.springframework.web.socket.*; |
|||
|
|||
/** |
|||
* @auther yq |
|||
* @data 2025/1/13 |
|||
*/ |
|||
public class MyWebSocketHandler implements WebSocketHandler { |
|||
|
|||
//建立新的 socket 连接后回调的方法
|
|||
@Override |
|||
public void afterConnectionEstablished(WebSocketSession session) throws Exception { |
|||
System.out.println("新连接"); |
|||
//从session中获取存放的参数
|
|||
session.getAttributes().get("id"); |
|||
} |
|||
|
|||
// 接收客户端发送的 Socket
|
|||
@Override |
|||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { |
|||
// 连接发送的消息
|
|||
String msg = message.getPayload().toString(); |
|||
System.out.println("收到的消息:" + msg+"来自客户端:"+session.getId()); |
|||
// 通过session向连接发送消息
|
|||
session.sendMessage(new TextMessage("我收到了你的消息,感谢你的来信")); |
|||
} |
|||
|
|||
//连接出错时,回调的方法
|
|||
@Override |
|||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { |
|||
System.out.println("收到的错误信息:" + exception); |
|||
} |
|||
|
|||
//连接关闭时,回调的方法
|
|||
@Override |
|||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { |
|||
System.out.println("断开连接"); |
|||
} |
|||
|
|||
|
|||
// WebSocketHandler 是否处理部分消息 默认返回false即可
|
|||
@Override |
|||
public boolean supportsPartialMessages() { |
|||
return false; |
|||
} |
|||
|
|||
} |
Loading…
Reference in new issue