吴远 4 months ago
parent
commit
afa4b6c492
  1. 11
      dk-common/common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java
  2. 4
      dk-common/common-oss/src/main/java/org/dromara/common/oss/factory/Minio.java
  3. 2
      dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketConfiguration.java
  4. 1
      dk-common/common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
  5. 7
      dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
  6. 7
      dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DevicePayloadServiceImpl.java
  7. 15
      dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceServiceImpl.java
  8. 2
      dk-modules/sample/src/main/java/org/dromara/sample/storage/service/impl/StorageServiceImpl.java
  9. 3
      dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineFileServiceImpl.java
  10. 27
      dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketHandler.java
  11. 27
      dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketManageServiceImpl.java

11
dk-common/common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java

@ -388,6 +388,10 @@ public class OssClient {
return upload(inputStream, getPath(properties.getPrefix(), suffix), length, contentType); return upload(inputStream, getPath(properties.getPrefix(), suffix), length, contentType);
} }
public UploadResult uploadSuffix(InputStream inputStream, String suffix, String fileName,Long length, String contentType) {
return upload(inputStream, getPath(properties.getPrefix(), suffix,fileName), length, contentType);
}
/** /**
* 上传文件到 Amazon S3使用指定的后缀构造对象键 * 上传文件到 Amazon S3使用指定的后缀构造对象键
* *
@ -502,13 +506,18 @@ public class OssClient {
* @return 文件路径 * @return 文件路径
*/ */
public String getPath(String prefix, String suffix) { public String getPath(String prefix, String suffix) {
// 生成日期路径
return prefix + StringUtils.SLASH + DateUtils.datePath() + StringUtils.SLASH + suffix ;
}
public String getPath(String prefix,String fileName, String suffix) {
// 生成uuid // 生成uuid
String uuid = IdUtil.fastSimpleUUID(); String uuid = IdUtil.fastSimpleUUID();
// 生成日期路径 // 生成日期路径
String datePath = DateUtils.datePath(); String datePath = DateUtils.datePath();
// 拼接路径 // 拼接路径
String path = StringUtils.isNotEmpty(prefix) ? String path = StringUtils.isNotEmpty(prefix) ?
prefix + StringUtils.SLASH + datePath + StringUtils.SLASH + uuid : datePath + StringUtils.SLASH + uuid; prefix + StringUtils.SLASH + datePath + StringUtils.SLASH + uuid : datePath + StringUtils.SLASH + fileName;
return path + suffix; return path + suffix;
} }

4
dk-common/common-oss/src/main/java/org/dromara/common/oss/factory/Minio.java

@ -32,8 +32,8 @@ public class Minio {
AssumeRoleProvider provider = new AssumeRoleProvider(client.getIsHttps()+client.getOssProperties().getEndpoint(), client.getOssProperties().getAccessKey(), AssumeRoleProvider provider = new AssumeRoleProvider(client.getIsHttps()+client.getOssProperties().getEndpoint(), client.getOssProperties().getAccessKey(),
client.getOssProperties().getSecretKey(), Math.toIntExact(client.getOssProperties().getExpire()), client.getOssProperties().getSecretKey(), Math.toIntExact(client.getOssProperties().getExpire()),
null, client.getOssProperties().getRegion(), null, null, null, null); null, client.getOssProperties().getRegion(), null, null, null, null);
Credentials credential = provider.fetch(); //Credentials credential = provider.fetch();
return new CredentialsToken(credential.accessKey(), credential.secretKey(), credential.sessionToken(), client.getOssProperties().getExpire()); return new CredentialsToken(client.getOssProperties().getAccessKey(),client.getOssProperties().getSecretKey(), "", client.getOssProperties().getExpire());
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
log.debug("Failed to obtain sts."); log.debug("Failed to obtain sts.");
e.printStackTrace(); e.printStackTrace();

2
dk-common/common-websocket/src/main/java/org/dromara/common/websocket/WebSocketConfiguration.java

@ -28,7 +28,7 @@ public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer
@Override @Override
public void registerStompEndpoints(StompEndpointRegistry registry) { public void registerStompEndpoints(StompEndpointRegistry registry) {
// Set the WebSocket connection address // Set the WebSocket connection address
registry.addEndpoint("/dkcyclog/sample/websocket").setAllowedOriginPatterns("*") registry.addEndpoint("/websocket").setAllowedOriginPatterns("*")
.setHandshakeHandler(handshakeHandler); .setHandshakeHandler(handshakeHandler);
} }

1
dk-common/common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@ -1 +0,0 @@
org.dromara.common.websocket.config.WebSocketConfig

7
dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceFirmwareServiceImpl.java

@ -21,7 +21,6 @@ import org.dromara.common.sdk.mqtt.events.EventsDataRequest;
import org.dromara.common.sdk.mqtt.events.TopicEventsRequest; import org.dromara.common.sdk.mqtt.events.TopicEventsRequest;
import org.dromara.common.sdk.mqtt.events.TopicEventsResponse; import org.dromara.common.sdk.mqtt.events.TopicEventsResponse;
import org.dromara.common.websocket.dto.BizCodeEnum; import org.dromara.common.websocket.dto.BizCodeEnum;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.sample.component.mqtt.model.EventsReceiver; import org.dromara.sample.component.mqtt.model.EventsReceiver;
import org.dromara.sample.manage.mapper.IDeviceFirmwareMapper; import org.dromara.sample.manage.mapper.IDeviceFirmwareMapper;
import org.dromara.sample.manage.model.dto.*; import org.dromara.sample.manage.model.dto.*;
@ -35,6 +34,7 @@ import org.dromara.sample.manage.service.IFirmwareModelService;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.sample.websocket.service.IWebSocketMessageService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -76,6 +76,9 @@ public class DeviceFirmwareServiceImpl extends AbstractFirmwareService implement
@Autowired @Autowired
private IDeviceRedisService deviceRedisService; private IDeviceRedisService deviceRedisService;
@Autowired
private IWebSocketMessageService webSocketMessageService;
@Override @Override
public Optional<DeviceFirmwareDTO> getFirmware(String workspaceId, String deviceName, String version) { public Optional<DeviceFirmwareDTO> getFirmware(String workspaceId, String deviceName, String version) {
return Optional.ofNullable(entity2Dto(mapper.selectOne( return Optional.ofNullable(entity2Dto(mapper.selectOne(
@ -159,7 +162,7 @@ public class DeviceFirmwareServiceImpl extends AbstractFirmwareService implement
deviceRedisService.setFirmwareUpgrading(sn, events); deviceRedisService.setFirmwareUpgrading(sn, events);
} }
events.setSn(sn); events.setSn(sn);
WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), BizCodeEnum.OTA_PROGRESS.getCode(), events); webSocketMessageService.sendBatch(workspaceId,UserTypeEnum.WEB.getVal(), BizCodeEnum.OTA_PROGRESS.getCode(), events);
} }
@Override @Override

7
dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DevicePayloadServiceImpl.java

@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import org.dromara.common.sdk.cloudapi.device.*; import org.dromara.common.sdk.cloudapi.device.*;
import org.dromara.common.websocket.dto.BizCodeEnum; import org.dromara.common.websocket.dto.BizCodeEnum;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.sample.control.model.enums.DroneAuthorityEnum; import org.dromara.sample.control.model.enums.DroneAuthorityEnum;
import org.dromara.sample.manage.mapper.IDevicePayloadMapper; import org.dromara.sample.manage.mapper.IDevicePayloadMapper;
import org.dromara.sample.manage.model.dto.*; import org.dromara.sample.manage.model.dto.*;
@ -16,6 +15,7 @@ import org.dromara.sample.manage.service.IDeviceDictionaryService;
import org.dromara.sample.manage.service.IDevicePayloadService; import org.dromara.sample.manage.service.IDevicePayloadService;
import org.dromara.sample.manage.service.IDeviceRedisService; import org.dromara.sample.manage.service.IDeviceRedisService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.sample.websocket.service.IWebSocketMessageService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -46,6 +46,9 @@ public class DevicePayloadServiceImpl implements IDevicePayloadService {
@Autowired @Autowired
private IDeviceRedisService deviceRedisService; private IDeviceRedisService deviceRedisService;
@Autowired
private IWebSocketMessageService webSocketMessageService;
@Override @Override
public Integer checkPayloadExist(String payloadSn) { public Integer checkPayloadExist(String payloadSn) {
DevicePayloadEntity devicePayload = mapper.selectOne( DevicePayloadEntity devicePayload = mapper.selectOne(
@ -81,7 +84,7 @@ public class DevicePayloadServiceImpl implements IDevicePayloadService {
return false; return false;
} }
if (controlMap.get(payloadReceiver.getSn()) != payloadReceiver.getControlSource()) { if (controlMap.get(payloadReceiver.getSn()) != payloadReceiver.getControlSource()) {
WebSocketUtils.publishAll( UserTypeEnum.WEB.getVal(), webSocketMessageService.sendBatch(device.getWorkspaceId(), UserTypeEnum.WEB.getVal(),
BizCodeEnum.CONTROL_SOURCE_CHANGE.getCode(), BizCodeEnum.CONTROL_SOURCE_CHANGE.getCode(),
DeviceAuthorityDTO.builder() DeviceAuthorityDTO.builder()
.controlSource(payloadReceiver.getControlSource()) .controlSource(payloadReceiver.getControlSource())

15
dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceServiceImpl.java

@ -25,7 +25,6 @@ import org.dromara.common.sdk.mqtt.services.TopicServicesResponse;
import org.dromara.common.sdk.mqtt.state.StateSubscribe; import org.dromara.common.sdk.mqtt.state.StateSubscribe;
import org.dromara.common.sdk.mqtt.status.StatusSubscribe; import org.dromara.common.sdk.mqtt.status.StatusSubscribe;
import org.dromara.common.websocket.dto.BizCodeEnum; import org.dromara.common.websocket.dto.BizCodeEnum;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.sample.common.error.CommonErrorEnum; import org.dromara.sample.common.error.CommonErrorEnum;
import org.dromara.sample.component.mqtt.model.EventsReceiver; import org.dromara.sample.component.mqtt.model.EventsReceiver;
import org.dromara.sample.control.model.enums.DroneAuthorityEnum; import org.dromara.sample.control.model.enums.DroneAuthorityEnum;
@ -41,6 +40,7 @@ import org.dromara.sample.manage.service.*;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.sample.websocket.service.IWebSocketMessageService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -122,6 +122,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired @Autowired
private AbstractFirmwareService abstractFirmwareService; private AbstractFirmwareService abstractFirmwareService;
@Autowired
private IWebSocketMessageService webSocketMessageService;
@Override @Override
public void subDeviceOffline(String deviceSn) { public void subDeviceOffline(String deviceSn) {
// If no information about this device exists in the cache, the drone is considered to be offline. // If no information about this device exists in the cache, the drone is considered to be offline.
@ -302,20 +305,20 @@ public class DeviceServiceImpl implements IDeviceService {
@Override @Override
public void pushDeviceOfflineTopo(String workspaceId, String deviceSn) { public void pushDeviceOfflineTopo(String workspaceId, String deviceSn) {
WebSocketUtils.publishAll( webSocketMessageService.sendBatch(workspaceId,
null,BizCodeEnum.DEVICE_OFFLINE.getCode(), null,BizCodeEnum.DEVICE_OFFLINE.getCode(),
new TopologyDeviceDTO().setSn(deviceSn).setOnlineStatus(false)); new TopologyDeviceDTO().setSn(deviceSn).setOnlineStatus(false));
} }
@Override @Override
public void pushDeviceOnlineTopo(String workspaceId, String gatewaySn, String deviceSn) { public void pushDeviceOnlineTopo(String workspaceId, String gatewaySn, String deviceSn) {
WebSocketUtils.publishAll(null,BizCodeEnum.DEVICE_ONLINE.getCode(), webSocketMessageService.sendBatch(null,BizCodeEnum.DEVICE_ONLINE.getCode(),
getDeviceTopoForPilot(deviceSn).orElseGet(TopologyDeviceDTO::new).setGatewaySn(gatewaySn)); getDeviceTopoForPilot(deviceSn).orElseGet(TopologyDeviceDTO::new).setGatewaySn(gatewaySn));
} }
@Override @Override
public void pushOsdDataToPilot(String workspaceId, String sn, DeviceOsdHost data) { public void pushOsdDataToPilot(String workspaceId, String sn, DeviceOsdHost data) {
WebSocketUtils.publishAll( UserTypeEnum.PILOT.getVal(), BizCodeEnum.DEVICE_OSD.getCode(), webSocketMessageService.sendBatch(workspaceId, UserTypeEnum.PILOT.getVal(), BizCodeEnum.DEVICE_OSD.getCode(),
new DeviceOsdWsResponse() new DeviceOsdWsResponse()
.setSn(sn) .setSn(sn)
.setHost(data)); .setHost(data));
@ -323,7 +326,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override @Override
public void pushOsdDataToWeb(String workspaceId, BizCodeEnum codeEnum, String sn, Object data) { public void pushOsdDataToWeb(String workspaceId, BizCodeEnum codeEnum, String sn, Object data) {
WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), codeEnum.getCode(), TelemetryDTO.builder().sn(sn).host(data).build()); webSocketMessageService.sendBatch(workspaceId,UserTypeEnum.WEB.getVal(), codeEnum.getCode(), TelemetryDTO.builder().sn(sn).host(data).build());
} }
/** /**
@ -655,7 +658,7 @@ public class DeviceServiceImpl implements IDeviceService {
gateway.setControlSource(controlSource); gateway.setControlSource(controlSource);
deviceRedisService.setDeviceOnline(gateway); deviceRedisService.setDeviceOnline(gateway);
WebSocketUtils.publishAll(UserTypeEnum.WEB.getVal(), webSocketMessageService.sendBatch(gateway.getWorkspaceId(),UserTypeEnum.WEB.getVal(),
BizCodeEnum.CONTROL_SOURCE_CHANGE.getCode(), BizCodeEnum.CONTROL_SOURCE_CHANGE.getCode(),
DeviceAuthorityDTO.builder() DeviceAuthorityDTO.builder()
.controlSource(gateway.getControlSource()) .controlSource(gateway.getControlSource())

2
dk-modules/sample/src/main/java/org/dromara/sample/storage/service/impl/StorageServiceImpl.java

@ -39,7 +39,7 @@ public class StorageServiceImpl extends AbstractMediaService implements IStorage
OssClient ossClient = OssFactory.instance("mediafile"); OssClient ossClient = OssFactory.instance("mediafile");
return new StsCredentialsResponse() return new StsCredentialsResponse()
.setEndpoint(ossClient.getIsHttps()+ossClient.getOssProperties().getEndpoint()) .setEndpoint(ossClient.getIsHttps()+ossClient.getOssProperties().getEndpoint())
.setBucket("mediafile") .setBucket(ossClient.getOssProperties().getBucketName())
.setCredentials(OssFactory.getCredentials(ossClient)) .setCredentials(OssFactory.getCredentials(ossClient))
.setProvider(OssTypeEnum.fromType(ossClient.getOssProperties().getSThreeType())) .setProvider(OssTypeEnum.fromType(ossClient.getOssProperties().getSThreeType()))
.setObjectKeyPrefix(ossClient.getOssProperties().getPrefix()) .setObjectKeyPrefix(ossClient.getOssProperties().getPrefix())

3
dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineFileServiceImpl.java

@ -197,8 +197,7 @@ public class WaylineFileServiceImpl implements IWaylineFileService {
waylineFile.setUsername(creator); waylineFile.setUsername(creator);
OssClient storage = OssFactory.instance("waylinefile"); OssClient storage = OssFactory.instance("waylinefile");
String originalfileName = file.getOriginalFilename(); String originalfileName = file.getOriginalFilename();
String suffix = org.apache.commons.lang3.StringUtils.substring(originalfileName, originalfileName.lastIndexOf("."), originalfileName.length()); UploadResult uploadResult = storage.uploadSuffix(file.getBytes(), originalfileName, file.getContentType());
UploadResult uploadResult = storage.uploadSuffix(file.getBytes(), suffix, file.getContentType());
waylineFile.setObjectKey(uploadResult.getFilename()); waylineFile.setObjectKey(uploadResult.getFilename());
List<String> deviceSns = new ArrayList<>(); List<String> deviceSns = new ArrayList<>();
deviceSns.add(deviceSn); deviceSns.add(deviceSn);

27
dk-modules/sample/src/main/java/org/dromara/sample/websocket/config/MyWebSocketHandler.java

@ -1,8 +1,14 @@
package org.dromara.sample.websocket.config; package org.dromara.sample.websocket.config;
import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.constant.CacheConstants;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.common.websocket.WebSocketDefaultHandler; import org.dromara.common.websocket.WebSocketDefaultHandler;
import org.dromara.sample.websocket.service.IWebSocketManageService; import org.dromara.sample.websocket.service.IWebSocketManageService;
import org.dromara.system.api.domain.SysUserOnline;
import org.dromara.system.api.model.LoginUser;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
@ -11,6 +17,8 @@ import org.springframework.web.socket.WebSocketSession;
import java.security.Principal; import java.security.Principal;
import static org.dromara.common.satoken.utils.LoginHelper.LOGIN_USER_KEY;
/** /**
* *
* @author sean.zhou * @author sean.zhou
@ -29,11 +37,13 @@ public class MyWebSocketHandler extends WebSocketDefaultHandler {
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Principal principal = session.getPrincipal();
if (StringUtils.hasText(principal.getName())) { Object cacheObject = RedisUtils.getCacheObject("000000:online_tokens:" + session.getUri().toString().substring(session.getUri().toString().indexOf("%20") + 3, session.getUri().toString().indexOf("&")));
webSocketManageService.put(principal.getName(), new MyConcurrentWebSocketSession(session)); SysUserOnline sysUserOnline = BeanUtil.copyProperties(cacheObject, SysUserOnline.class);
if (StringUtils.hasText(sysUserOnline.getUserName())) {
webSocketManageService.put(sysUserOnline.getUserName(), new MyConcurrentWebSocketSession(session));
log.debug("{} is connected. ID: {}. WebSocketSession[current count: {}]", log.debug("{} is connected. ID: {}. WebSocketSession[current count: {}]",
principal.getName(), session.getId(), webSocketManageService.getConnectedCount()); sysUserOnline.getUserName(), session.getId(), webSocketManageService.getConnectedCount());
return; return;
} }
session.close(); session.close();
@ -41,11 +51,12 @@ public class MyWebSocketHandler extends WebSocketDefaultHandler {
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
Principal principal = session.getPrincipal(); Object cacheObject = RedisUtils.getCacheObject("000000:online_tokens:" + session.getUri().toString().substring(session.getUri().toString().indexOf("%20") + 3, session.getUri().toString().indexOf("&")));
if (StringUtils.hasText(principal.getName())) { SysUserOnline sysUserOnline = BeanUtil.copyProperties(cacheObject, SysUserOnline.class);
webSocketManageService.remove(principal.getName(), session.getId()); if (StringUtils.hasText(sysUserOnline.getUserName())) {
webSocketManageService.remove(sysUserOnline.getUserName(), session.getId());
log.debug("{} is disconnected. ID: {}. WebSocketSession[current count: {}]", log.debug("{} is disconnected. ID: {}. WebSocketSession[current count: {}]",
principal.getName(), session.getId(), webSocketManageService.getConnectedCount()); sysUserOnline.getUserName(), session.getId(), webSocketManageService.getConnectedCount());
} }
} }

27
dk-modules/sample/src/main/java/org/dromara/sample/websocket/service/impl/WebSocketManageServiceImpl.java

@ -29,30 +29,17 @@ public class WebSocketManageServiceImpl implements IWebSocketManageService {
@Override @Override
public void put(String key, MyConcurrentWebSocketSession val) { public void put(String key, MyConcurrentWebSocketSession val) {
String[] name = key.split("/"); String[] name = key.split("/");
if (name.length != 3) { String workspaceKey = RedisConst.WEBSOCKET_PREFIX;
log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); RedisOpsUtils.hashSet(workspaceKey, name[0], name[0]);
return; SESSIONS.put(name[0], val);
}
String sessionId = val.getId();
String workspaceKey = RedisConst.WEBSOCKET_PREFIX + name[0];
String userTypeKey = RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(Integer.parseInt(name[1])).getDesc();
RedisOpsUtils.hashSet(workspaceKey, sessionId, name[2]);
RedisOpsUtils.hashSet(userTypeKey, sessionId, name[2]);
SESSIONS.put(sessionId, val);
RedisOpsUtils.expireKey(workspaceKey, RedisConst.WEBSOCKET_ALIVE_SECOND); RedisOpsUtils.expireKey(workspaceKey, RedisConst.WEBSOCKET_ALIVE_SECOND);
RedisOpsUtils.expireKey(userTypeKey, RedisConst.WEBSOCKET_ALIVE_SECOND);
} }
@Override @Override
public void remove(String key, String sessionId) { public void remove(String key, String sessionId) {
String[] name = key.split("/"); String[] name = key.split("/");
if (name.length != 3) { RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX , name);
log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); SESSIONS.remove(name[0]);
return;
}
RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX + name[0], new String[] {sessionId});
RedisOpsUtils.hashDel(RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(Integer.parseInt(name[1])).getDesc(), new String[] {sessionId});
SESSIONS.remove(sessionId);
} }
@Override @Override
@ -60,7 +47,7 @@ public class WebSocketManageServiceImpl implements IWebSocketManageService {
if (!StringUtils.hasText(workspaceId)) { if (!StringUtils.hasText(workspaceId)) {
return Collections.emptySet(); return Collections.emptySet();
} }
String key = RedisConst.WEBSOCKET_PREFIX + workspaceId; String key = RedisConst.WEBSOCKET_PREFIX;
return RedisOpsUtils.hashKeys(key) return RedisOpsUtils.hashKeys(key)
.stream() .stream()
@ -71,7 +58,7 @@ public class WebSocketManageServiceImpl implements IWebSocketManageService {
@Override @Override
public Collection<MyConcurrentWebSocketSession> getValueWithWorkspaceAndUserType(String workspaceId, Integer userType) { public Collection<MyConcurrentWebSocketSession> getValueWithWorkspaceAndUserType(String workspaceId, Integer userType) {
String key = RedisConst.WEBSOCKET_PREFIX + UserTypeEnum.find(userType).getDesc(); String key = RedisConst.WEBSOCKET_PREFIX;
return RedisOpsUtils.hashKeys(key) return RedisOpsUtils.hashKeys(key)
.stream() .stream()
.map(SESSIONS::get) .map(SESSIONS::get)

Loading…
Cancel
Save