diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/MegaphoneController.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/MegaphoneController.java index 12f5f39..7e9a8e8 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/MegaphoneController.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/MegaphoneController.java @@ -34,6 +34,7 @@ import org.dromara.common.sdk.mqtt.services.ServicesReplyReceiver; import org.dromara.common.sdk.mqtt.services.TopicServicesRequest; import org.dromara.common.translation.annotation.Translation; import org.dromara.sample.common.util.Md5Utils; +import org.dromara.sample.feign.RemoteSystemFeign; import org.dromara.sample.manage.mapper.IDeviceStreamMapper; import org.dromara.sample.manage.model.dto.*; import org.dromara.sample.manage.model.entity.DeviceStreamEntity; @@ -48,6 +49,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.*; +import java.util.stream.Collectors; import static org.apache.commons.compress.utils.ArchiveUtils.sanitize; import static org.dromara.common.sdk.mqtt.MqttGatewayPublish.DEFAULT_RETRY_COUNT; @@ -81,6 +83,8 @@ public class MegaphoneController { @Resource IDeviceStreamMapper deviceStreamMapper; + @Resource + private RemoteSystemFeign remoteSystemFeign; /** * 喊话器-开始播放文档(对应遥控器的录音广播模式) * Get the topology list of all online devices in one workspace. @@ -267,32 +271,76 @@ public class MegaphoneController { //发送mqtt String s = remoteConfigService.selectStreamIp(); DisobeyDTO param = Convert.convert(DisobeyDTO.class, objectMap); + List deviceStreamEntities = deviceStreamMapper.selectList(new QueryWrapper().eq("rtmp_url", param.getRtmpUrl())); + List types = deviceStreamEntities.stream() + .map(DeviceStreamEntity::getStreamType) + .collect(Collectors.toList()); if (param.getOpen() == 1){ - deviceStreamMapper.delete(new QueryWrapper().eq("rtmp_url", param.getRtmpUrl())); +// deviceStreamMapper.delete(new QueryWrapper().eq("rtmp_url", param.getRtmpUrl())); if (StringUtils.isNotEmpty(param.getModel())){ String[] split = param.getModel().split(","); - for (String type : split){ - DeviceStreamEntity deviceStreamEntity = new DeviceStreamEntity(); - deviceStreamEntity.setStreamType(type); - deviceStreamEntity.setCreateTime(new Date()); - deviceStreamEntity.setUpdateTime(new Date()); - deviceStreamEntity.setRtmpUrl(param.getRtmpUrl()); - deviceStreamMapper.insert(deviceStreamEntity); - List list = new ArrayList<>(); - list.add(type); - DisobeyDTO disobeyDTO = new DisobeyDTO(); - disobeyDTO.setUrl(s); - disobeyDTO.setOpen(param.getOpen()); - disobeyDTO.setType(list); - disobeyDTO.setRtmpUrl(param.getRtmpUrl()); - gatewayPublish.publish(top,new CommonTopicRequest<>() - .setData(Objects.requireNonNull(disobeyDTO)),1); - + if (CollectionUtils.isNotEmpty(types) && split.length > types.size()){ + //加数据 + for (String type : split){ + if (!types.contains(type)){ + DeviceStreamEntity deviceStreamEntity = new DeviceStreamEntity(); + deviceStreamEntity.setStreamType(type); + deviceStreamEntity.setCreateTime(new Date()); + deviceStreamEntity.setUpdateTime(new Date()); + deviceStreamEntity.setRtmpUrl(param.getRtmpUrl()); + deviceStreamMapper.insert(deviceStreamEntity); + List list = new ArrayList<>(); + list.add(type); + DisobeyDTO disobeyDTO = new DisobeyDTO(); + disobeyDTO.setUrl(s); + disobeyDTO.setOpen(param.getOpen()); + disobeyDTO.setType(list); + disobeyDTO.setRtmpUrl(param.getRtmpUrl()); + gatewayPublish.publish(top,new CommonTopicRequest<>() + .setData(Objects.requireNonNull(disobeyDTO)),1); + } + } + }else if (CollectionUtils.isNotEmpty(types) && split.length < types.size()){ + //删数据 + for (String type : types){ + List list1 = Arrays.asList(split); + if (!list1.contains(type)){ + deviceStreamMapper.delete(new QueryWrapper().eq("rtmp_url", param.getRtmpUrl()) + .eq("stream_type", type)); + List list = new ArrayList<>(); + list.add(type); + DisobeyDTO disobeyDTO = new DisobeyDTO(); + disobeyDTO.setUrl(s); + disobeyDTO.setOpen(2); + disobeyDTO.setType(list); + disobeyDTO.setRtmpUrl(param.getRtmpUrl()); + gatewayPublish.publish(top,new CommonTopicRequest<>() + .setData(Objects.requireNonNull(disobeyDTO)),1); + } + } + }else { + for (String type : split){ + DeviceStreamEntity deviceStreamEntity = new DeviceStreamEntity(); + deviceStreamEntity.setStreamType(type); + deviceStreamEntity.setCreateTime(new Date()); + deviceStreamEntity.setUpdateTime(new Date()); + deviceStreamEntity.setRtmpUrl(param.getRtmpUrl()); + deviceStreamMapper.insert(deviceStreamEntity); + List list = new ArrayList<>(); + list.add(type); + DisobeyDTO disobeyDTO = new DisobeyDTO(); + disobeyDTO.setUrl(s); + disobeyDTO.setOpen(param.getOpen()); + disobeyDTO.setType(list); + disobeyDTO.setRtmpUrl(param.getRtmpUrl()); + gatewayPublish.publish(top,new CommonTopicRequest<>() + .setData(Objects.requireNonNull(disobeyDTO)),1); + + } } } } if (param.getOpen() == 2){ - List deviceStreamEntities = deviceStreamMapper.selectList(new QueryWrapper().eq("rtmp_url", param.getRtmpUrl())); if (CollectionUtils.isNotEmpty(deviceStreamEntities)){ for (DeviceStreamEntity deviceStreamEntity: deviceStreamEntities){ deviceStreamMapper.delete(new QueryWrapper().eq("rtmp_url", deviceStreamEntity.getRtmpUrl()) @@ -468,4 +516,20 @@ public class MegaphoneController { // } + @GetMapping("/streams") + @Operation(summary = "获取媒体流ip。", description = "获取媒体流ip。") + @Transactional + public HttpResultResponse> streams (@RequestParam String deviceSn + ) { + List list = remoteSystemFeign.selectStreamType(deviceSn); + List list1 = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(list)){ + for (String s : list){ + String[] split = s.split(","); + list1.add(split[2]); + } + } + return HttpResultResponse.success(list1); + } + } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/PlayTextController.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/PlayTextController.java index f94cc91..5424bca 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/PlayTextController.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/PlayTextController.java @@ -91,4 +91,6 @@ public class PlayTextController { } + + } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/LiveStreamServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/LiveStreamServiceImpl.java index 034950e..b81b722 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/LiveStreamServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/LiveStreamServiceImpl.java @@ -1,5 +1,8 @@ package org.dromara.sample.manage.service.impl; +import cn.hutool.core.convert.Convert; +import jakarta.annotation.Resource; +import org.apache.commons.collections.CollectionUtils; import org.dromara.common.rocketmq.producer.MessageProducerUtil; import org.dromara.common.sdk.cloudapi.device.DeviceDomainEnum; import org.dromara.common.sdk.cloudapi.device.VideoId; @@ -8,9 +11,12 @@ import org.dromara.common.sdk.cloudapi.livestream.api.AbstractLivestreamService; import org.dromara.common.sdk.cloudapi.wayline.FlighttaskProgress; import org.dromara.common.sdk.common.HttpResultResponse; import org.dromara.common.sdk.common.SDKManager; +import org.dromara.common.sdk.mqtt.CommonTopicRequest; +import org.dromara.common.sdk.mqtt.MqttGatewayPublish; import org.dromara.common.sdk.mqtt.services.ServicesReplyData; import org.dromara.common.sdk.mqtt.services.TopicServicesResponse; import org.dromara.sample.component.mqtt.model.EventsReceiver; +import org.dromara.sample.feign.RemoteSystemFeign; import org.dromara.sample.manage.model.dto.*; import org.dromara.sample.manage.model.entity.DeviceEntity; import org.dromara.sample.manage.model.param.DeviceQueryParam; @@ -49,6 +55,8 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @Autowired private AbstractLivestreamService abstractLivestreamService; + @Resource + private MqttGatewayPublish gatewayPublish; @Autowired private IPlayTextService playTextService; @@ -59,6 +67,9 @@ public class LiveStreamServiceImpl implements ILiveStreamService { @Autowired private IWaylineJobService waylineJobService; + @Resource + private RemoteSystemFeign remoteSystemFeign; + @Override public List getLiveCapacity(String workspaceId) { @@ -134,15 +145,32 @@ public class LiveStreamServiceImpl implements ILiveStreamService { Optional> runningWaylineJob = waylineRedisService.getRunningWaylineJob(deviceBySn.getDeviceSn()); Map reqMap = new HashMap<>(); if(runningWaylineJob.isPresent()) { - ILivestreamUrl iLivestreamUrl = LiveStreamProperty.get(UrlTypeEnum.RTMP); - String jobId = runningWaylineJob.get().getOutput().getExt().getFlightId(); - Optional waylineJobDTO = waylineJobService.getJobByJobInternalId(jobId); - reqMap.put("jobId",waylineJobDTO.get().getJobId()); - reqMap.put("jobName",waylineJobDTO.get().getJobName()); - reqMap.put("deviceSn",deviceBySn.getDeviceSn()); - reqMap.put("videoId",liveParam.getVideoId().toString()); - reqMap.put("url",iLivestreamUrl.toString()); - playTextService.streamType(reqMap); + //开启ai推流 + List deviceSn = remoteSystemFeign.selectStreamType(deviceBySn.getDeviceSn()); + if (CollectionUtils.isNotEmpty(deviceSn)){ + ILivestreamUrl iLivestreamUrl = LiveStreamProperty.get(UrlTypeEnum.RTMP); + String jobId = runningWaylineJob.get().getOutput().getExt().getFlightId(); + Optional waylineJobDTO = waylineJobService.getJobByJobInternalId(jobId); + reqMap.put("jobId",waylineJobDTO.get().getJobId()); + reqMap.put("jobName",waylineJobDTO.get().getJobName()); + reqMap.put("deviceSn",deviceBySn.getDeviceSn()); + reqMap.put("videoId",liveParam.getVideoId().getDroneSn().toString()+"-"+liveParam.getVideoId().getPayloadIndex().toString()); + reqMap.put("url",iLivestreamUrl.toString()); + playTextService.streamType(reqMap); + for (String s : deviceSn){ + String[] split = s.split(","); + List list = new ArrayList<>(); + list.add(split[2]); + DisobeyDTO disobeyDTO = new DisobeyDTO(); + disobeyDTO.setUrl("http://114.235.183.163"); + disobeyDTO.setOpen(1); + disobeyDTO.setType(list); + disobeyDTO.setRtmpUrl(liveParam.getVideoId().getDroneSn().toString()+"-"+liveParam.getVideoId().getPayloadIndex().toString()); + gatewayPublish.publish("task/image/disobey/smoke",new CommonTopicRequest<>() + .setData(Objects.requireNonNull(disobeyDTO)),1); + } + } + } } @@ -156,10 +184,25 @@ public class LiveStreamServiceImpl implements ILiveStreamService { if (HttpResultResponse.CODE_SUCCESS != responseResult.getCode()) { return responseResult; } - TopicServicesResponse response = abstractLivestreamService.liveStopPush( SDKManager.getDeviceSDK(responseResult.getData().getDeviceSn()), new LiveStopPushRequest() .setVideoId(videoId)); +// //关闭ai推流 +// List deviceSn = remoteSystemFeign.selectStreamType(responseResult.getData().getDeviceSn()); +// if (CollectionUtils.isNotEmpty(deviceSn)){ +// for (String s : deviceSn){ +// List list = new ArrayList<>(); +// list.add(s); +// DisobeyDTO disobeyDTO = new DisobeyDTO(); +// disobeyDTO.setUrl(s); +// disobeyDTO.setOpen(2); +// disobeyDTO.setType(list); +// disobeyDTO.setRtmpUrl(videoId.toString()); +// gatewayPublish.publish("task/image/disobey/smoke",new CommonTopicRequest<>() +// .setData(Objects.requireNonNull(disobeyDTO)),1); +// } +// } + if (!response.getData().getResult().isSuccess()) { return HttpResultResponse.error(response.getData().getResult()); }