diff --git a/dk-api/api-business/src/main/java/org/dromara/business/api/domain/vo/RemoteBusinessAlertVo.java b/dk-api/api-business/src/main/java/org/dromara/business/api/domain/vo/RemoteBusinessAlertVo.java index 6beda09..6ef5b3d 100644 --- a/dk-api/api-business/src/main/java/org/dromara/business/api/domain/vo/RemoteBusinessAlertVo.java +++ b/dk-api/api-business/src/main/java/org/dromara/business/api/domain/vo/RemoteBusinessAlertVo.java @@ -18,7 +18,7 @@ public class RemoteBusinessAlertVo implements Serializable { /** * */ - private String id; + private Long id; /** * job任务id diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock2ThingVersionEnum.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock2ThingVersionEnum.java index c3750b1..0ec0755 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock2ThingVersionEnum.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock2ThingVersionEnum.java @@ -12,11 +12,22 @@ import java.util.Arrays; */ public enum Dock2ThingVersionEnum implements IThingVersion { - V1_1_2("1.1.2", CloudSDKVersionEnum.V1_0_1), + V1_0_0("1.0.0", CloudSDKVersionEnum.V0_0_1), + + V1_1_0("1.1.0", CloudSDKVersionEnum.V1_0_0), + + V1_1_2("1.1.2", CloudSDKVersionEnum.V1_0_0), + + V1_1_3("1.1.3", CloudSDKVersionEnum.V1_0_2), V1_2_0("1.2.0", CloudSDKVersionEnum.V1_0_3), + + V1_2_3("1.2.3", CloudSDKVersionEnum.V1_0_3), + V1_3_0("1.3.0", CloudSDKVersionEnum.V1_0_3), - V1_3_1("1.3.1", CloudSDKVersionEnum.V1_3_1), + + + V1_3_1("1.3.1", CloudSDKVersionEnum.V1_0_3), ; private final String thingVersion; diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock3ThingVersionEnum.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock3ThingVersionEnum.java index 9cfa70c..68c3fc4 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock3ThingVersionEnum.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/Dock3ThingVersionEnum.java @@ -12,14 +12,21 @@ import java.util.Arrays; */ public enum Dock3ThingVersionEnum implements IThingVersion { - V1_1_2("1.1.2", CloudSDKVersionEnum.V1_0_1), + V1_0_0("1.0.0", CloudSDKVersionEnum.V0_0_1), + + V1_1_0("1.1.0", CloudSDKVersionEnum.V1_0_0), + + V1_1_2("1.1.2", CloudSDKVersionEnum.V1_0_0), + + V1_1_3("1.1.3", CloudSDKVersionEnum.V1_0_2), V1_2_0("1.2.0", CloudSDKVersionEnum.V1_0_3), V1_2_3("1.2.3", CloudSDKVersionEnum.V1_0_3), V1_3_0("1.3.0", CloudSDKVersionEnum.V1_0_3), - V1_3_1("1.3.1", CloudSDKVersionEnum.V1_3_1), + + V1_3_1("1.3.1", CloudSDKVersionEnum.V1_0_3), ; private final String thingVersion; diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/DockThingVersionEnum.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/DockThingVersionEnum.java index 6c6cfb6..8b4b807 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/DockThingVersionEnum.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/config/version/DockThingVersionEnum.java @@ -14,14 +14,20 @@ public enum DockThingVersionEnum implements IThingVersion { V1_0_0("1.0.0", CloudSDKVersionEnum.V0_0_1), - V1_1_0("1.1.0", CloudSDKVersionEnum.V0_0_1), + V1_1_0("1.1.0", CloudSDKVersionEnum.V1_0_0), V1_1_2("1.1.2", CloudSDKVersionEnum.V1_0_0), V1_1_3("1.1.3", CloudSDKVersionEnum.V1_0_2), + V1_2_0("1.2.0", CloudSDKVersionEnum.V1_0_3), + V1_2_3("1.2.3", CloudSDKVersionEnum.V1_0_3), + V1_3_0("1.3.0", CloudSDKVersionEnum.V1_0_3), + + V1_3_1("1.3.1", CloudSDKVersionEnum.V1_0_3), + ; private final String thingVersion; diff --git a/dk-modules/sample/pom.xml b/dk-modules/sample/pom.xml index 4548b5d..7249b87 100644 --- a/dk-modules/sample/pom.xml +++ b/dk-modules/sample/pom.xml @@ -132,5 +132,21 @@ 1.12.261 - + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/control/controller/DrcController.java b/dk-modules/sample/src/main/java/org/dromara/sample/control/controller/DrcController.java index e4d0934..7cd6547 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/control/controller/DrcController.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/control/controller/DrcController.java @@ -31,26 +31,26 @@ public class DrcController { @Autowired private IDrcService drcService; - @PostMapping("/workspaces/drc/connect") - public HttpResultResponse drcConnect(HttpServletRequest request, @Valid @RequestBody DrcConnectParam param) { + @PostMapping("/workspaces/{workspace_id}/drc/connect") + public HttpResultResponse drcConnect(@PathVariable("workspace_id") String workspaceId,HttpServletRequest request, @Valid @RequestBody DrcConnectParam param) { LoginUser loginUser = LoginHelper.getLoginUser(); - DrcModeMqttBroker brokerDTO = drcService.userDrcAuth(loginUser.getTenantId(), loginUser.getLoginId(), loginUser.getUsername(), param); + DrcModeMqttBroker brokerDTO = drcService.userDrcAuth(workspaceId, loginUser.getLoginId(), loginUser.getUsername(), param); return HttpResultResponse.success(brokerDTO); } - @PostMapping("/workspaces/drc/enter") - public HttpResultResponse drcEnter(@Valid @RequestBody DrcModeParam param) { + @PostMapping("/workspaces/{workspace_id}/drc/enter") + public HttpResultResponse drcEnter(@PathVariable("workspace_id") String workspaceId,@Valid @RequestBody DrcModeParam param) { LoginUser loginUser = LoginHelper.getLoginUser(); - JwtAclDTO acl = drcService.deviceDrcEnter(loginUser.getTenantId(), param); + JwtAclDTO acl = drcService.deviceDrcEnter(workspaceId, param); return HttpResultResponse.success(acl); } - @PostMapping("/workspaces/drc/exit") - public HttpResultResponse drcExit( @Valid @RequestBody DrcModeParam param) { + @PostMapping("/workspaces/{workspace_id}/drc/exit") + public HttpResultResponse drcExit(@PathVariable("workspace_id") String workspaceId, @Valid @RequestBody DrcModeParam param) { LoginUser loginUser = LoginHelper.getLoginUser(); - drcService.deviceDrcExit(loginUser.getTenantId(), param); + drcService.deviceDrcExit(workspaceId, param); return HttpResultResponse.success(); } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DevicePayloadEntity.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DevicePayloadEntity.java index 8794f19..3ffd4c5 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DevicePayloadEntity.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DevicePayloadEntity.java @@ -7,6 +7,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; +import java.util.Date; /** * @author sean.zhou @@ -45,10 +46,10 @@ public class DevicePayloadEntity implements Serializable { private String deviceSn; @TableField(value = "create_time", fill = FieldFill.INSERT) - private Long createTime; + private Date createTime; @TableField(value = "update_time", fill = FieldFill.INSERT_UPDATE) - private Long updateTime; + private Date updateTime; @TableField(value = "payload_desc") private String payloadDesc; diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DevicePayloadServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DevicePayloadServiceImpl.java index 210bd00..d833e64 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DevicePayloadServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DevicePayloadServiceImpl.java @@ -61,6 +61,7 @@ public class DevicePayloadServiceImpl implements IDevicePayloadService { entity.setId(id); // For the payload of the drone itself, there is no firmware version. entity.setFirmwareVersion(null); + entity.setCreateTime(new Date()); return mapper.updateById(entity) > 0 ? entity.getId() : 0; } return mapper.insert(entity) > 0 ? entity.getId() : 0; diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/SDKDeviceService.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/SDKDeviceService.java index 033ee0f..a6ad008 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/SDKDeviceService.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/SDKDeviceService.java @@ -401,6 +401,9 @@ public class SDKDeviceService extends AbstractDeviceService { .loginTime(LocalDateTime.now()) .deviceSn(gatewaySn) .childDeviceSn(deviceSn).build(); + if(!StringUtils.hasText(workspaceId)){ + return; + } deviceService.updateDevice(gateway); deviceService.updateDevice(device); gateway = deviceRedisService.getDeviceOnline(gatewaySn).map(g -> { diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaFileServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaFileServiceImpl.java index 5f4b909..3cd911b 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaFileServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaFileServiceImpl.java @@ -116,11 +116,11 @@ public class FlightAreaFileServiceImpl implements IFlightAreaFileService { length++; } is.reset(); - OssClient storage = OssFactory.instance("flight_area_file"); + OssClient storage = OssFactory.instance("flightareafile"); storage.uploadSuffix(is,name,length,"json"); return FlightAreaFileDTO.builder() .name(name) - .objectKey("flight_area_file/"+name) + .objectKey("flightareafile/"+name) .fileId(UUID.randomUUID().toString()) .size(os.size()) .workspaceId(workspaceId) diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java index e2a26be..3189b6a 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/map/service/impl/FlightAreaServiceImpl.java @@ -297,7 +297,7 @@ public class FlightAreaServiceImpl extends AbstractFlightAreaService implements if (flightAreaOpt.isEmpty() || fileOpt.isEmpty()) { file = packageFlightArea(device.getWorkspaceId()); } - OssClient storage = OssFactory.instance("flight_area_file"); + OssClient storage = OssFactory.instance("flightareafile"); return new TopicRequestsResponse>().setData( MqttReply.success(new FlightAreasGetResponse().setFiles( List.of(new FlightAreaGetFile() diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/controller/AiCompareController.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/controller/AiCompareController.java index 251489c..c6c89af 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/controller/AiCompareController.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/controller/AiCompareController.java @@ -4,6 +4,7 @@ import cn.dev33.satoken.annotation.SaCheckPermission; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.validation.constraints.NotNull; import lombok.RequiredArgsConstructor; +import org.dromara.business.api.domain.vo.RemoteBusinessAlertVo; import org.dromara.common.core.domain.R; import org.dromara.common.core.validate.AddGroup; import org.dromara.common.core.validate.EditGroup; @@ -121,7 +122,6 @@ public class AiCompareController extends BaseController { /** * 画框 - * 参数:jobId */ @SaCheckPermission("sample:compare:add") @Log(title = "预警画框", businessType = BusinessType.INSERT) @@ -132,7 +132,7 @@ public class AiCompareController extends BaseController { } /** - * 算法结果 + * 算法结果 算法使用 */ @PostMapping("/expose") public R expose(@RequestBody List> mapList) { @@ -140,11 +140,29 @@ public class AiCompareController extends BaseController { } /** - * 算法预警生成 + * 算法预警生成 算法使用 */ @PostMapping("/expose/alert") public R alert(@RequestBody List> mapList) { - return toAjax(aiCompareService.expose(mapList)); + return toAjax(aiCompareService.alert(mapList)); + } + + /** + * 验证预警列表 + */ + @PostMapping("/alertList") + public R> alertList(String jobId) { + return R.ok(aiCompareService.alertList(jobId)); } + + /** + * + * 删除验证列表预警 + * @param ids 主键串 + * * */ + @DeleteMapping("/delAlertList") + public R delAlertList(Listids) { + return toAjax(aiCompareService.delAlertList(ids)); + } } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/IAiCompareService.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/IAiCompareService.java index 499b94f..c994ff5 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/IAiCompareService.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/IAiCompareService.java @@ -1,5 +1,6 @@ package org.dromara.sample.wayline.service; +import org.dromara.business.api.domain.vo.RemoteBusinessAlertVo; import org.dromara.common.mybatis.core.page.PageQuery; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.sample.wayline.model.dto.AiCompareDTO; @@ -28,4 +29,7 @@ public interface IAiCompareService { Boolean pictureFrame(FrameDTO frameDTO); Boolean expose(List> mapList); Boolean alert(List> mapList); + + List alertList(String jobId); + Boolean delAlertList(Listids); } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java index 132a052..ec86bdb 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java @@ -9,7 +9,11 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.dubbo.config.annotation.DubboReference; import org.apache.ibatis.executor.BatchResult; +import org.dromara.business.api.RemoteBusinessAlertService; +import org.dromara.business.api.domain.bo.RemoteBusinessAlertBo; +import org.dromara.business.api.domain.vo.RemoteBusinessAlertVo; import org.dromara.common.core.constant.AiCompareStatusConstants; import org.dromara.common.core.constant.BusinessConstants; @@ -63,6 +67,8 @@ public class AiCompareServiceImpl implements IAiCompareService { private final IFileService fileService; private final MqttGatewayPublish mqttGatewayPublish; private final IAiComparePlateMapper aiComparePlateMapper; + @DubboReference + private final RemoteBusinessAlertService businessAlertService; @Override public TableDataInfo queryPageList(AiCompareDTO bo, PageQuery pageQuery) { return TableDataInfo.build(aiCompareMapper.selectPage(pageQuery.build(),this.buildAiCompareDTOQueryWrapper(bo))); @@ -213,8 +219,22 @@ public class AiCompareServiceImpl implements IAiCompareService { @Override public Boolean alert(List> mapList) { + + return null; } + @Override + public List alertList(String jobId) { + RemoteBusinessAlertBo businessAlertBo = new RemoteBusinessAlertBo(); + businessAlertBo.setJobId(jobId); + return businessAlertService.listVerifyAlert(businessAlertBo); + } + + @Override + public Boolean delAlertList(List ids) { + return businessAlertService.deleteAlert(ids); + } + } diff --git a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java index c94d62e..78a0ec9 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java @@ -2,20 +2,23 @@ package org.dromara.system.dubbo; import lombok.RequiredArgsConstructor; import org.apache.dubbo.config.annotation.DubboService; -import org.dromara.common.core.utils.MapstructUtils; + import org.dromara.system.api.RemotePostService; -import org.dromara.system.api.domain.vo.RemoteAiLabelPostVo; import org.dromara.system.api.domain.vo.RemotePostVo; import org.dromara.system.domain.bo.SysPostBo; import org.dromara.system.domain.vo.SysPostVo; import org.dromara.system.service.ISysPostService; -import org.springframework.beans.BeanUtils; + import org.springframework.stereotype.Service; -import java.util.ArrayList; + import java.util.List; import java.util.stream.Collectors; - +/** + * @author sean + * @version 1.1 + * @date 2022/6/1 + */ @RequiredArgsConstructor @Service @DubboService diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java deleted file mode 100644 index 4ec745a..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.ruoyi.testrocketmq; - -import com.ruoyi.common.security.annotation.EnableCustomConfig; -import com.ruoyi.common.security.annotation.EnableRyFeignClients; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.scheduling.annotation.EnableAsync; - -/** - * 平台管理模块 - * - * @author ruoyi - */ -@EnableCustomConfig -@EnableRyFeignClients -@SpringBootApplication -@EnableAsync -public class RocketMQApplication -{ - public static void main(String[] args) - { - SpringApplication.run(RocketMQApplication.class, args); - System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); - } -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java deleted file mode 100644 index 38ebbae..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.ruoyi.testrocketmq.config; - -import com.ruoyi.testrocketmq.consumer.RocketMsgListener; -import com.ruoyi.testrocketmq.enums.MessageCodeEnum; -import com.ruoyi.testrocketmq.model.ConsumerMode; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * 消费者配置 - */ -@RefreshScope -@Configuration -@Slf4j -public class ConsumerConfig { - @Autowired - private ConsumerMode consumerMode; - - @Bean - public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { -// ConsumerMode consumerMode = new ConsumerMode(); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); - consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); - consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); - consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); - consumer.registerMessageListener(new RocketMsgListener()); - /** - * 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 - * 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 - * 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 - * 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 - */ - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); - /** - * CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 - * 订阅topic整体,从而达到负载均衡的目的 - * BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 - * - */ - // consumer.setMessageModel(MessageModel.BROADCASTING); - - consumer.setVipChannelEnabled(false); - consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); - try { - /** - * 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 - */ - consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); - consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); - consumer.start(); - log.info("消费者初始化成功:{}", consumer.toString()); - } catch (MQClientException e) { - e.printStackTrace(); - log.error("消费者初始化失败:{}",e.getMessage()); - } - return consumer; - } -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java deleted file mode 100644 index 1d4b56e..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.ruoyi.testrocketmq.config; - -/** - * @author yz - */ -public class MessageConfig { - private Class messageClass; - private boolean orderlyMessage; - - public Class getMessageClass() { - return messageClass; - } - - public void setMessageClass(Class messageClass) { - this.messageClass = messageClass; - } - - public boolean isOrderlyMessage() { - return orderlyMessage; - } - - public void setOrderlyMessage(boolean orderlyMessage) { - this.orderlyMessage = orderlyMessage; - } - - -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java deleted file mode 100644 index b55917e..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.ruoyi.testrocketmq.config; - -import com.ruoyi.testrocketmq.model.ProducerMode; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@Configuration -@Slf4j -public class ProducerConfig { - - public static DefaultMQProducer producer; - - @Autowired - private ProducerMode producerMode; - - - - @Bean - public DefaultMQProducer getRocketMQProducer() { - producer = new DefaultMQProducer(producerMode.getGroupName()); - producer.setNamesrvAddr(producerMode.getNamesrvAddr()); - //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName - if(producerMode.getMaxMessageSize()!=null){ - producer.setMaxMessageSize(producerMode.getMaxMessageSize()); - } - if(producerMode.getSendMsgTimeout()!=null){ - producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); - } - //如果发送消息失败,设置重试次数,默认为2次 - if(producerMode.getRetryTimesWhenSendFailed()!=null){ - producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); - } - producer.setVipChannelEnabled(false); - try { - producer.start(); - log.info("生产者初始化成功:{}",producer.toString()); - } catch (MQClientException e) { - log.error("生产者初始化失败:{}",e.getMessage()); - } - return producer; - } - -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java deleted file mode 100644 index 88d2b05..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.ruoyi.testrocketmq.consumer; - -import com.ruoyi.testrocketmq.enums.MessageCodeEnum; -import com.ruoyi.testrocketmq.producer.ConsumeException; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.io.UnsupportedEncodingException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; - -/** - * 消息监听 - */ -@Slf4j -@Component -public class RocketMsgListener implements MessageListenerConcurrently { - - /** - * 消费消息 - * - * @param list msgs.size() >= 1 - * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here - * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 - * @param consumeConcurrentlyContext 上下文信息 - * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) - */ - @Override - public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { - - if (!CollectionUtils.isEmpty(list)) { - for (MessageExt messageExt : list) { - // 消息内容 - String body = new String(messageExt.getBody()); - log.info("接受到的消息为:{}", body); - String tags = messageExt.getTags(); - String topic = messageExt.getTopic(); - String msgId = messageExt.getMsgId(); - String keys = messageExt.getKeys(); - int reConsume = messageExt.getReconsumeTimes(); - // 消息已经重试了3次,如果不需要再次消费,则返回成功 - if (reConsume == 3) { - // TODO 补偿信息 - //smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败"); - log.error("消息重试超过3次,消费失败!"); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - // 订单超时处理 - if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { - if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { -// //获取订单 -// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys); -// if (dealUserOrder != null) { -// //订单状态超时未支付关闭订单 处理 -// if (dealUserOrder.getStatus().equals("1")) { -// DealUserOrder dealUserOrders = new DealUserOrder(); -// dealUserOrders.setOrderId(dealUserOrder.getOrderId()); -// dealUserOrders.setStatus("4"); -// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders); -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// } -// log.info("Order does not exist."); -// } - log.info("Consumption success:" + body); - DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - log.info("Consumption time:{}", format.format(new Date())); - } else { - log.info("未匹配到Tag【{}】" + tags); - } - } - } - } - // 消息消费成功 - //ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次 - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - - /** - * 异常处理 - * - * @param e 捕获的异常 - * @return 消息消费结果 - */ - private static ConsumeConcurrentlyStatus handleException(final Exception e) { - Class exceptionClass = e.getClass(); - if (exceptionClass.equals(UnsupportedEncodingException.class)) { - log.error(e.getMessage()); - } else if (exceptionClass.equals(ConsumeException.class)) { - log.error(e.getMessage()); - } - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java deleted file mode 100644 index e6ff8c4..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.ruoyi.testrocketmq.enums; - - -import lombok.Getter; - -@Getter -public enum MessageCodeEnum { - /** - * 消息模块主题 - */ - MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), - /** - * 系统消息 - */ - NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), - /** - * 用户消息 - */ - USER_MESSAGE("user-message","用户消息服务模块topic名称"), - - /** - * 订单消息 - */ - ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), - - /** - * 平台编号 - */ - USER_MESSAGE_TAG("user_message_tag","用户消息推送"), - NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), - ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), - - /** - * 订单处理编号 - */ - //订单超时处理 - ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); - - - private final String code; - private final String msg; - - MessageCodeEnum(String code, String msg){ - this.code = code; - this.msg = msg; - } - - public static String valuesOfType(String code) { - String value = ""; - for (MessageCodeEnum e : MessageCodeEnum.values()) { - if (code.equals(e.code)) { - value = e.msg; - } - - } - return value; - } - - -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java deleted file mode 100644 index e66738a..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.ruoyi.testrocketmq.model; - -import lombok.Data; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; - -@Data -@Configuration -@Component -public class ConsumerMode { - @Value("${suning.rocketmq.namesrvAddr}") - private String namesrvAddr; - @Value("${suning.rocketmq.conumer.groupName}") - private String groupName ; - @Value("${suning.rocketmq.conumer.consumeThreadMin}") - private int consumeThreadMin; - @Value("${suning.rocketmq.conumer.consumeThreadMax}") - private int consumeThreadMax; - @Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") - private int consumeMessageBatchMaxSize; -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java deleted file mode 100644 index 0cd4060..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.ruoyi.testrocketmq.model; - -import lombok.Data; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.context.annotation.Configuration; - -/** - * 生产者初始化 - */ -@RefreshScope -@Data -@Configuration -public class ProducerMode { - @Value("${suning.rocketmq.producer.groupName}") - private String groupName; - @Value("${suning.rocketmq.namesrvAddr}") - private String namesrvAddr; - @Value("${suning.rocketmq.producer.maxMessageSize}") - private Integer maxMessageSize; - @Value("${suning.rocketmq.producer.sendMsgTimeout}") - private Integer sendMsgTimeout; - @Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") - private Integer retryTimesWhenSendFailed; -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java deleted file mode 100644 index 86ed05c..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.ruoyi.testrocketmq.producer; - -import com.ruoyi.testrocketmq.config.ProducerConfig; -import org.springframework.beans.factory.annotation.Autowired; - -public class AsyncProducer { - - @Autowired - private ProducerConfig producerConfig; - - /** - * 发送异步的消息 - * @param topic 主题 - * @param tag 标签 - * @param key 自定义的key,根据业务来定 - * @param value 消息的内容 - * @return org.apache.rocketmq.client.producer.SendResult - */ -// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException { -// -// try { -// DefaultMQProducer defaultMQProducer = producerConfig.producer; -// //Create a message instance, specifying topic, tag and message body. -// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET)); -// defaultMQProducer.send(msg, new SendCallback() { -// // 异步回调的处理 -// @Override -// public void onSuccess(SendResult sendResult) { -// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); -// } -// -// @Override -// public void onException(Throwable e) { -// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); -// e.printStackTrace(); -// } -// }); -// } catch (MQClientException e) { -// e.printStackTrace(); -// } catch (RemotingException e) { -// e.printStackTrace(); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// return null; -// } -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java deleted file mode 100644 index 82d4314..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.ruoyi.testrocketmq.producer; - -/** - * @author 影子 - */ -public class ConsumeException extends RuntimeException{ - private static final long serialVersionUID = 4093867789628938836L; - - public ConsumeException(String message) { - super(message); - } - - public ConsumeException(Throwable cause) { - super(cause); - } - - public ConsumeException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java deleted file mode 100644 index fa6afbe..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.ruoyi.testrocketmq.producer; - - - -import lombok.Data; -import lombok.ToString; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; - -/** - * 消费时,当前所消费的消息的上下文信息 - * - * @author jolly - */ -@ToString -@Data -public final class MessageContext { - - /** - * 所消费消息所在的消息队列 - * - * @see MessageQueue - */ - private MessageQueue messageQueue; - - /** - * 所消费的消息的扩展属性 - * - * @see MessageExt - */ - private MessageExt messageExt; - - -} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java deleted file mode 100644 index 7450530..0000000 --- a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java +++ /dev/null @@ -1,110 +0,0 @@ -package com.ruoyi.testrocketmq.producer; - -import com.alibaba.fastjson.JSON; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingException; - -import java.io.UnsupportedEncodingException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; - -import static com.ruoyi.rocketmq.config.ProducerConfig.producer; - -/** - * 消息发送 - */ -@Slf4j -public class MessageProducer { - - - /** - * 同步发送消息 - * @param topic 主题 - * @param tag 标签 - * @param key 自定义的key,根据业务来定 - * @param value 消息的内容 - * @return org.apache.rocketmq.client.producer.SendResult - */ - public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ - String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; - try { - Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); - System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); - SendResult result = producer.send(msg); - return result; - } catch (UnsupportedEncodingException e) { - log.error("消息初始化失败!body:{}",body); - - } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { - log.error("消息发送失败! body:{}",body); - } - return null; - } - - - - /** - * 发送有序的消息 - * @param messagesList Message集合 - * @param messageQueueNumber 消息队列编号 - * @return org.apache.rocketmq.client.producer.SendResult - */ - public SendResult sendOrderlyMessage(List messagesList, int messageQueueNumber) { - SendResult result = null; - for (Message message : messagesList) { - try { -// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message); -// System.out.println(defaultMQProducer); - result = producer.send(message, (list, msg, arg) -> { - Integer queueNumber = (Integer) arg; - return list.get(queueNumber); - }, messageQueueNumber); - } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { - log.error("发送有序消息失败"); - return result; - } - } - return result; - } - - /** - * 推送延迟消息 - * @param topic - * @param tag - * @param key - * @return boolean - */ - public SendResult sendDelayMessage(String topic, String tag, String key, String value) - { - SendResult result = null; - try - { - Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); - //设置消息延迟级别,我这里设置5,对应就是延时一分钟 - // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" - msg.setDelayTimeLevel(4); - // 发送消息到一个Broker - result = producer.send(msg); - // 通过sendResult返回消息是否成功送达 - log.info("发送延迟消息结果:======sendResult:{}", result); - DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - log.info("发送时间:{}", format.format(new Date())); - return result; - } - catch (Exception e) - { - e.printStackTrace(); - log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); - } - return result; - } - - -} diff --git a/pom.xml b/pom.xml index 699db26..137c96b 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 2.2.2 UTF-8 UTF-8 - 21 + 17 3.2.11 2023.0.3 3.2.3 @@ -85,7 +85,7 @@ dev - wuyuan + dev 127.0.0.1:8848 DEFAULT_GROUP DEFAULT_GROUP @@ -472,6 +472,16 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + ${project.build.sourceEncoding} + + org.apache.maven.plugins