Browse Source

喊话器

master
李克 3 months ago
parent
commit
21666cf350
  1. 7
      dk-api/api-system/src/main/java/org/dromara/system/api/RemoteConfigService.java
  2. 79
      dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/MqttGatewayPublish.java
  3. 12
      dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/property/PropertySetPublish.java
  4. 10
      dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteConfigServiceImpl.java
  5. 2
      dk-modules/system/src/main/java/org/dromara/system/service/ISysConfigService.java
  6. 7
      dk-modules/system/src/main/java/org/dromara/system/service/impl/SysConfigServiceImpl.java

7
dk-api/api-system/src/main/java/org/dromara/system/api/RemoteConfigService.java

@ -14,4 +14,11 @@ public interface RemoteConfigService {
*/
boolean selectRegisterEnabled(String tenantId);
/**
* 获取流媒体ip
*
* @return true开启false关闭
*/
String selectStreamIp();
}

79
dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/MqttGatewayPublish.java

@ -101,5 +101,84 @@ public class MqttGatewayPublish {
throw new CloudSDKException(CloudSDKErrorEnum.MQTT_PUBLISH_ABNORMAL, "No message reply received.");
}
public void publishMegaphone(String topic, int qos, MegaphoneTopicRequest request) {
try {
log.debug("send topic: {}, payload: {}", topic, request.toString());
byte[] payload = Common.getObjectMapper().writeValueAsBytes(request);
messageGateway.publish(topic, payload, qos);
} catch (JsonProcessingException e) {
log.error("Failed to publish the message. {}", request.toString());
e.printStackTrace();
}
}
public <T> CommonTopicResponse<T> publishPlayTextWithReply(Class<T> clazz, String topic, MegaphoneTopicRequest request, int retryCount, long timeout) {
AtomicInteger time = new AtomicInteger(0);
boolean hasBid = StringUtils.hasText(request.getBid());
request.setBid(hasBid ? request.getBid() : UUID.randomUUID().toString());
// RetryServicesReplyReceiver
while (time.getAndIncrement() <= retryCount) {
this.publishMegaphone(topic, DEFAULT_RETRY_COUNT,request);
// If the message is not received in 3 seconds then resend it again.
CommonTopicResponse<T> receiver = Chan.getInstance(request.getTid(), true).get(request.getTid(), timeout);
// Need to match tid and bid.
if (Objects.nonNull(receiver)
&& receiver.getTid().equals(request.getTid())
&& receiver.getBid().equals(request.getBid())) {
if (clazz.isAssignableFrom(receiver.getData().getClass())) {
return receiver;
}
throw new TypeMismatchException(receiver.getData(), clazz);
}
// It must be guaranteed that the tid and bid of each message are different.
if (!hasBid) {
request.setBid(UUID.randomUUID().toString());
}
request.setTid(UUID.randomUUID().toString());
}
throw new CloudSDKException(CloudSDKErrorEnum.MQTT_PUBLISH_ABNORMAL, "No message reply received.");
}
public void publishPlayText(String topic, String reply, int qos, CommonTopicRequest request) {
try {
log.debug("send topic: {}, payload: {}", topic, request.toString());
byte[] payload = Common.getObjectMapper().writeValueAsBytes(request);
messageGateway.publish(topic, payload, qos);
} catch (JsonProcessingException e) {
log.error("Failed to publish the message. {}", request.toString());
e.printStackTrace();
}
}
public void publishPlayText(String topic, int qos, CommonTopicResponse response) {
try {
log.debug("send topic: {}, payload: {}", topic, response.toString());
byte[] payload = Common.getObjectMapper().writeValueAsBytes(response);
messageGateway.publish(topic, payload, qos);
} catch (JsonProcessingException e) {
log.error("Failed to publish the message. {}", response.toString());
e.printStackTrace();
}
}
public void publishPlayText(String topic, String reply, CommonTopicRequest request, int publishCount) {
AtomicInteger time = new AtomicInteger(0);
while (time.getAndIncrement() < publishCount) {
this.publishPlayText(topic, reply, DEFAULT_QOS, request);
}
}
public void publishPlayText(String topic, String reply, CommonTopicRequest request) {
this.publishPlayText(topic, reply, DEFAULT_QOS, request);
}
public void publishPlayTextReply(CommonTopicResponse response, String reply, MessageHeaders headers) {
this.publishPlayText(headers.get(MqttHeaders.RECEIVED_TOPIC) + reply, 2, response);
}
}

12
dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/property/PropertySetPublish.java

@ -1,5 +1,6 @@
package org.dromara.common.sdk.mqtt.property;
import org.dromara.common.sdk.mqtt.MegaphoneTopicRequest;
import org.dromara.common.sdk.mqtt.MqttGatewayPublish;
import org.dromara.common.sdk.mqtt.TopicConst;
import jakarta.annotation.Resource;
@ -37,4 +38,15 @@ public class PropertySetPublish {
.setData(Objects.requireNonNull(data)), retryCount, timeout).getData();
}
public PropertySetReplyResultEnum publishPlayText(String topic, Object data, String method) {
return gatewayPublish.publishPlayTextWithReply(
PropertySetReplyResultEnum.class, topic, new MegaphoneTopicRequest<>()
.setTid(UUID.randomUUID().toString())
.setBid(null)
.setMethod(method)
.setTimestamp(System.currentTimeMillis())
.setData(Objects.requireNonNull(data)), MqttGatewayPublish.DEFAULT_RETRY_COUNT, MqttGatewayPublish.DEFAULT_RETRY_TIMEOUT).getData();
}
}

10
dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteConfigServiceImpl.java

@ -1,8 +1,10 @@
package org.dromara.system.dubbo;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.apache.dubbo.config.annotation.DubboService;
import org.dromara.system.api.RemoteConfigService;
import org.dromara.system.domain.SysConfig;
import org.dromara.system.service.ISysConfigService;
import org.springframework.stereotype.Service;
@ -18,6 +20,8 @@ public class RemoteConfigServiceImpl implements RemoteConfigService {
private final ISysConfigService configService;
private final ISysConfigService sysConfigService;
/**
* 获取注册开关
*/
@ -26,4 +30,10 @@ public class RemoteConfigServiceImpl implements RemoteConfigService {
return configService.selectRegisterEnabled(tenantId);
}
@Override
public String selectStreamIp() {
String ip =sysConfigService.selectStreamIp();
return ip;
}
}

2
dk-modules/system/src/main/java/org/dromara/system/service/ISysConfigService.java

@ -84,4 +84,6 @@ public interface ISysConfigService {
*/
boolean checkConfigKeyUnique(SysConfigBo config);
String selectStreamIp();
}

7
dk-modules/system/src/main/java/org/dromara/system/service/impl/SysConfigServiceImpl.java

@ -203,4 +203,11 @@ public class SysConfigServiceImpl implements ISysConfigService {
return true;
}
@Override
public String selectStreamIp() {
SysConfig sysConfig = baseMapper.selectOne(new LambdaQueryWrapper<SysConfig>()
.eq(SysConfig::getConfigKey, "stream"));
return sysConfig.getConfigValue();
}
}

Loading…
Cancel
Save