diff --git a/dk-api/api-system/src/main/java/org/dromara/system/api/RemoteConfigService.java b/dk-api/api-system/src/main/java/org/dromara/system/api/RemoteConfigService.java index 4c4502e..4be1ec1 100644 --- a/dk-api/api-system/src/main/java/org/dromara/system/api/RemoteConfigService.java +++ b/dk-api/api-system/src/main/java/org/dromara/system/api/RemoteConfigService.java @@ -14,4 +14,11 @@ public interface RemoteConfigService { */ boolean selectRegisterEnabled(String tenantId); + /** + * 获取流媒体ip + * + * @return true开启,false关闭 + */ + String selectStreamIp(); + } diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/MqttGatewayPublish.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/MqttGatewayPublish.java index 794a636..b0f3c80 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/MqttGatewayPublish.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/MqttGatewayPublish.java @@ -101,5 +101,84 @@ public class MqttGatewayPublish { throw new CloudSDKException(CloudSDKErrorEnum.MQTT_PUBLISH_ABNORMAL, "No message reply received."); } + public void publishMegaphone(String topic, int qos, MegaphoneTopicRequest request) { + try { + log.debug("send topic: {}, payload: {}", topic, request.toString()); + byte[] payload = Common.getObjectMapper().writeValueAsBytes(request); + messageGateway.publish(topic, payload, qos); + } catch (JsonProcessingException e) { + log.error("Failed to publish the message. {}", request.toString()); + e.printStackTrace(); + } + } + + + public CommonTopicResponse publishPlayTextWithReply(Class clazz, String topic, MegaphoneTopicRequest request, int retryCount, long timeout) { + AtomicInteger time = new AtomicInteger(0); + boolean hasBid = StringUtils.hasText(request.getBid()); + request.setBid(hasBid ? request.getBid() : UUID.randomUUID().toString()); + // RetryServicesReplyReceiver + while (time.getAndIncrement() <= retryCount) { + this.publishMegaphone(topic, DEFAULT_RETRY_COUNT,request); + + // If the message is not received in 3 seconds then resend it again. + CommonTopicResponse receiver = Chan.getInstance(request.getTid(), true).get(request.getTid(), timeout); + // Need to match tid and bid. + if (Objects.nonNull(receiver) + && receiver.getTid().equals(request.getTid()) + && receiver.getBid().equals(request.getBid())) { + if (clazz.isAssignableFrom(receiver.getData().getClass())) { + return receiver; + } + throw new TypeMismatchException(receiver.getData(), clazz); + } + // It must be guaranteed that the tid and bid of each message are different. + if (!hasBid) { + request.setBid(UUID.randomUUID().toString()); + } + request.setTid(UUID.randomUUID().toString()); + } + throw new CloudSDKException(CloudSDKErrorEnum.MQTT_PUBLISH_ABNORMAL, "No message reply received."); + } + + + + + public void publishPlayText(String topic, String reply, int qos, CommonTopicRequest request) { + try { + log.debug("send topic: {}, payload: {}", topic, request.toString()); + byte[] payload = Common.getObjectMapper().writeValueAsBytes(request); + messageGateway.publish(topic, payload, qos); + } catch (JsonProcessingException e) { + log.error("Failed to publish the message. {}", request.toString()); + e.printStackTrace(); + } + } + + public void publishPlayText(String topic, int qos, CommonTopicResponse response) { + try { + log.debug("send topic: {}, payload: {}", topic, response.toString()); + byte[] payload = Common.getObjectMapper().writeValueAsBytes(response); + messageGateway.publish(topic, payload, qos); + } catch (JsonProcessingException e) { + log.error("Failed to publish the message. {}", response.toString()); + e.printStackTrace(); + } + } + + public void publishPlayText(String topic, String reply, CommonTopicRequest request, int publishCount) { + AtomicInteger time = new AtomicInteger(0); + while (time.getAndIncrement() < publishCount) { + this.publishPlayText(topic, reply, DEFAULT_QOS, request); + } + } + + public void publishPlayText(String topic, String reply, CommonTopicRequest request) { + this.publishPlayText(topic, reply, DEFAULT_QOS, request); + } + + public void publishPlayTextReply(CommonTopicResponse response, String reply, MessageHeaders headers) { + this.publishPlayText(headers.get(MqttHeaders.RECEIVED_TOPIC) + reply, 2, response); + } } diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/property/PropertySetPublish.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/property/PropertySetPublish.java index b4299fc..f240de8 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/property/PropertySetPublish.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/property/PropertySetPublish.java @@ -1,5 +1,6 @@ package org.dromara.common.sdk.mqtt.property; +import org.dromara.common.sdk.mqtt.MegaphoneTopicRequest; import org.dromara.common.sdk.mqtt.MqttGatewayPublish; import org.dromara.common.sdk.mqtt.TopicConst; import jakarta.annotation.Resource; @@ -37,4 +38,15 @@ public class PropertySetPublish { .setData(Objects.requireNonNull(data)), retryCount, timeout).getData(); } + + public PropertySetReplyResultEnum publishPlayText(String topic, Object data, String method) { + return gatewayPublish.publishPlayTextWithReply( + PropertySetReplyResultEnum.class, topic, new MegaphoneTopicRequest<>() + .setTid(UUID.randomUUID().toString()) + .setBid(null) + .setMethod(method) + .setTimestamp(System.currentTimeMillis()) + .setData(Objects.requireNonNull(data)), MqttGatewayPublish.DEFAULT_RETRY_COUNT, MqttGatewayPublish.DEFAULT_RETRY_TIMEOUT).getData(); + } + } diff --git a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteConfigServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteConfigServiceImpl.java index 6a43c0b..21be450 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteConfigServiceImpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteConfigServiceImpl.java @@ -1,8 +1,10 @@ package org.dromara.system.dubbo; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import org.apache.dubbo.config.annotation.DubboService; import org.dromara.system.api.RemoteConfigService; +import org.dromara.system.domain.SysConfig; import org.dromara.system.service.ISysConfigService; import org.springframework.stereotype.Service; @@ -18,6 +20,8 @@ public class RemoteConfigServiceImpl implements RemoteConfigService { private final ISysConfigService configService; + private final ISysConfigService sysConfigService; + /** * 获取注册开关 */ @@ -26,4 +30,10 @@ public class RemoteConfigServiceImpl implements RemoteConfigService { return configService.selectRegisterEnabled(tenantId); } + @Override + public String selectStreamIp() { + String ip =sysConfigService.selectStreamIp(); + return ip; + } + } diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/ISysConfigService.java b/dk-modules/system/src/main/java/org/dromara/system/service/ISysConfigService.java index f7efda7..e8dda5f 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/ISysConfigService.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/ISysConfigService.java @@ -84,4 +84,6 @@ public interface ISysConfigService { */ boolean checkConfigKeyUnique(SysConfigBo config); + String selectStreamIp(); + } diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysConfigServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysConfigServiceImpl.java index c3917e0..0899e3c 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysConfigServiceImpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysConfigServiceImpl.java @@ -203,4 +203,11 @@ public class SysConfigServiceImpl implements ISysConfigService { return true; } + @Override + public String selectStreamIp() { + SysConfig sysConfig = baseMapper.selectOne(new LambdaQueryWrapper() + .eq(SysConfig::getConfigKey, "stream")); + return sysConfig.getConfigValue(); + } + }