From eb537f798ad73b39397078ce247b88dce3fddea5 Mon Sep 17 00:00:00 2001 From: yangwei <867012372@qq.com> Date: Wed, 26 Mar 2025 18:39:42 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=201=E3=80=81=E9=9B=86=E6=88=90rocketmq=20?= =?UTF-8?q?2=E3=80=81=E5=BC=80=E5=8F=91=E9=A6=96=E9=A1=B5=E9=A2=84?= =?UTF-8?q?=E8=AD=A6=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dromara/system/api/RemotePostService.java | 10 + .../system/api/domain/vo/RemotePostVo.java | 63 ++++++ .../controller/BusinessAlertController.java | 1 + .../BusinessAlertStatisticsController.java | 56 +++++- .../business/domain/bo/BusinessAlertBo.java | 5 + .../business/mapper/BusinessAlertMapper.java | 27 ++- .../IBusinessAlertStatisticsService.java | 7 +- .../impl/BusinessAlertServiceImpl.java | 2 + .../BusinessAlertStatisticsServiceImpl.java | 132 ++++++++++-- .../mapper/business/BusinessAlertMapper.xml | 41 ++++ .../system/dubbo/RemotePostServiceImpl.java | 28 +++ .../dromara/workflow/domain/FlowDepart.java | 4 +- .../workflow/domain/vo/FlowDefinitionVo.java | 3 + .../impl/FlwDefinitionServiceImpl.java | 10 + dk-visual/pom.xml | 1 + dk-visual/rocketmq/pom.xml | 85 ++++++++ .../ruoyi/rocketmq/RocketMQApplication.java | 21 ++ .../ruoyi/rocketmq/config/ConsumerConfig.java | 75 +++++++ .../ruoyi/rocketmq/config/ProducerConfig.java | 56 ++++++ .../rocketmq/consumer/RocketMsgListener.java | 87 ++++++++ .../controller/RocketMqController.java | 123 ++++++++++++ .../ruoyi/rocketmq/enums/MessageCodeEnum.java | 55 +++++ .../ruoyi/rocketmq/enums/MessageTopic.java | 24 +++ .../ruoyi/rocketmq/model/ConsumerMode.java | 26 +++ .../ruoyi/rocketmq/model/ProducerMode.java | 25 +++ .../rocketmq/producer/ConsumeException.java | 23 +++ .../rocketmq/producer/MessageProducer.java | 190 ++++++++++++++++++ .../src/main/resources/application.yml | 59 ++++++ .../rocketmq/src/main/resources/banner.txt | 6 + .../src/main/resources/logback-plus.xml | 28 +++ .../testrocketmq/RocketMQApplication.java | 25 +++ .../testrocketmq/config/ConsumerConfig.java | 64 ++++++ .../testrocketmq/config/MessageConfig.java | 27 +++ .../testrocketmq/config/ProducerConfig.java | 48 +++++ .../consumer/RocketMsgListener.java | 100 +++++++++ .../testrocketmq/enums/MessageCodeEnum.java | 60 ++++++ .../testrocketmq/model/ConsumerMode.java | 22 ++ .../testrocketmq/model/ProducerMode.java | 25 +++ .../testrocketmq/producer/AsyncProducer.java | 47 +++++ .../producer/ConsumeException.java | 20 ++ .../testrocketmq/producer/MessageContext.java | 34 ++++ .../producer/MessageProducer.java | 110 ++++++++++ 42 files changed, 1820 insertions(+), 35 deletions(-) create mode 100644 dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java create mode 100644 dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java create mode 100644 dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java create mode 100644 dk-visual/rocketmq/pom.xml create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java create mode 100644 dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java create mode 100644 dk-visual/rocketmq/src/main/resources/application.yml create mode 100644 dk-visual/rocketmq/src/main/resources/banner.txt create mode 100644 dk-visual/rocketmq/src/main/resources/logback-plus.xml create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java create mode 100644 dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java diff --git a/dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java b/dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java new file mode 100644 index 0000000..9d93117 --- /dev/null +++ b/dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java @@ -0,0 +1,10 @@ +package org.dromara.system.api; + +import org.dromara.system.api.domain.vo.RemotePostVo; + +import java.util.List; + +public interface RemotePostService { + List listPost(); + +} diff --git a/dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java b/dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java new file mode 100644 index 0000000..99f83ac --- /dev/null +++ b/dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java @@ -0,0 +1,63 @@ +package org.dromara.system.api.domain.vo; + +import com.alibaba.excel.annotation.ExcelProperty; +import lombok.Data; +import org.dromara.common.excel.annotation.ExcelDictFormat; +import org.dromara.common.excel.convert.ExcelDictConvert; + +import java.io.Serial; +import java.util.Date; + +@Data +public class RemotePostVo { + + /** + * 岗位ID + */ + private Long postId; + + /** + * 部门id + */ + private Long deptId; + + /** + * 岗位编码 + */ + private String postCode; + + /** + * 岗位名称 + */ + private String postName; + + /** + * 岗位类别编码 + */ + private String postCategory; + + /** + * 显示顺序 + */ + private Integer postSort; + + /** + * 状态(0正常 1停用) + */ + private String status; + + /** + * 备注 + */ + private String remark; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 部门名 + */ + private String deptName; +} diff --git a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java index b9860f2..beb4461 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java +++ b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java @@ -1,5 +1,6 @@ package org.dromara.business.controller; +import cn.dev33.satoken.annotation.SaCheckPermission; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; diff --git a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java index 458d8e8..c5b490e 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java +++ b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java @@ -26,11 +26,6 @@ public class BusinessAlertStatisticsController extends BaseController { private final IBusinessAlertStatisticsService statisticsService; - @Operation(summary="获取预警处置率", description="获取预警处置率") - @GetMapping(value = "/rate") - public R alertRate(BusinessAlertBo businessAlertBo) { - return R.ok(statisticsService.alertRate(businessAlertBo)); - } @Operation(summary="panel看板数据", description="panel看板数据") @GetMapping(value = "/panel/count") @@ -38,6 +33,57 @@ public class BusinessAlertStatisticsController extends BaseController { return R.ok(statisticsService.countPanelAlert(businessAlertBo)); } + /** + * 总体情况看板 + * @param businessAlertBo + * @return + */ + @Operation(summary="总体情况看板", description="总体情况看板") + @GetMapping(value = "/total/panel/count") + public R> countPanelTotalAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countPanelTotalAlert(businessAlertBo)); + } + + + /** + * 今日预警情况 + * @param businessAlertBo + * @return + */ + @Operation(summary="今日预警情况", description="今日预警情况") + @GetMapping(value = "/current/day/count") + public R> countCurrentDayAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countCurrentDayAlert(businessAlertBo)); + } + + /** + * 各局今日预警数 + * @param businessAlertBo + * @return + */ + @Operation(summary="各局今日预警数", description="各局今日预警数") + @GetMapping(value = "/post/day/count") + public R> countPostDayAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countPostDayAlert(businessAlertBo)); + } + + + /** + * 各局处理情况 + * @param businessAlertBo + * @return + */ + @Operation(summary="各局处理情况", description="各局处理情况") + @GetMapping(value = "/post/count") + public R> countPostAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countPostAlert(businessAlertBo)); + } + + + + + + //饼图显示每个月根据部门 @Operation(summary="根据月份显示预警个数", description="根据月份显示预警个数") @GetMapping(value = "/month/count") diff --git a/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java b/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java index 6f48e94..143dcba 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java +++ b/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java @@ -249,4 +249,9 @@ public class BusinessAlertBo { */ private List aiLabelEnList; + /** + * 时间搜索类型 1年,2月,3日 + */ + private Integer dateType = 1; + } diff --git a/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java b/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java index 2b12d26..58392b0 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java +++ b/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java @@ -49,34 +49,31 @@ public interface BusinessAlertMapper extends BaseMapperPlus pageBusinessAlertCancel(Page build, QueryWrapper ew); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> listMonthAlert(@Param("param") BusinessAlertBo businessAlertBo); List> listDepartAlert(@Param("param") BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) List> listMonthAlertStatus(@Param("param")BusinessAlertBo businessAlertBo); List> listDepartAlertStatus(@Param("param") BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> countAlertCompare(@Param("param")BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> countPanelAlert(@Param("param")BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> countAlertTypeCompare(@Param("param") BusinessAlertBo businessAlertBo); + + Integer countFromOldToCurrent(BusinessAlertBo businessAlertBo); + + Map countCurrentAlert(BusinessAlertBo businessAlertBo); + + Map countMonthAlert(BusinessAlertBo businessAlertBo); + + Map countCurrentDayAlert(BusinessAlertBo businessAlertBo); } diff --git a/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java b/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java index 5652a6b..95b6d9e 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java +++ b/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java @@ -22,6 +22,11 @@ public interface IBusinessAlertStatisticsService { List> countAlertTypeCompare(BusinessAlertBo businessAlertBo); - Double alertRate(BusinessAlertBo businessAlertBo); + Map countPanelTotalAlert(BusinessAlertBo businessAlertBo); + Map countCurrentDayAlert(BusinessAlertBo businessAlertBo); + + Map countPostDayAlert(BusinessAlertBo businessAlertBo); + + List countPostAlert(BusinessAlertBo businessAlertBo); } diff --git a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java index 7bb810f..c82409b 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java +++ b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java @@ -128,6 +128,8 @@ public class BusinessAlertServiceImpl implements IBusinessAlertService { wrapper.set(BusinessAlert::getAssignDate, new Date()); } + wrapper.set(BusinessAlert::getCompleteDate,new Date()); + wrapper.eq(BusinessAlert::getId, businessId); this.baseMapper.update(wrapper); diff --git a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java index 0df2d28..bcb4690 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java +++ b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.business.service.impl; +import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.ObjectUtil; import lombok.RequiredArgsConstructor; import org.apache.dubbo.config.annotation.DubboReference; @@ -10,8 +11,10 @@ import org.dromara.business.service.IBusinessAlertStatisticsService; import org.dromara.common.satoken.utils.LoginHelper; import org.dromara.system.api.RemoteDeptService; import org.dromara.system.api.RemoteLabelPostService; +import org.dromara.system.api.RemotePostService; import org.dromara.system.api.domain.vo.RemoteAiLabelPostVo; import org.dromara.system.api.domain.vo.RemoteDeptVo; +import org.dromara.system.api.domain.vo.RemotePostVo; import org.springframework.stereotype.Service; import java.math.BigDecimal; @@ -37,6 +40,8 @@ public class BusinessAlertStatisticsServiceImpl implements IBusinessAlertStatist @DubboReference RemoteLabelPostService remoteLabelPostService; + @DubboReference + RemotePostService remotePostService; /** * 按照月份分类预警数量(包含权限) @@ -142,23 +147,126 @@ public class BusinessAlertStatisticsServiceImpl implements IBusinessAlertStatist return baseMapper.countAlertTypeCompare(businessAlertBo); } + /** + * 总体情况 + * @param businessAlertBo + * @return + */ @Override - public Double alertRate(BusinessAlertBo businessAlertBo) { - List> mapList = baseMapper.countPanelAlert(businessAlertBo); + public Map countPanelTotalAlert(BusinessAlertBo businessAlertBo) { + Map result = new HashMap<>(); + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + //2024一直到当前年份处理预警个数 + Integer totalFinishCount = baseMapper.countFromOldToCurrent(businessAlertBo); + result.put("totalFinishCount", totalFinishCount); + //今年处理率 今年的预警总数 今年处理个数 + Map currentYearMap = baseMapper.countCurrentAlert(businessAlertBo); + // 四舍五入保留两位小数 + double yearTotal = Double.parseDouble(String.valueOf(currentYearMap.get("total"))); + double yearFinishCount = Double.parseDouble(String.valueOf(currentYearMap.get("finishCount"))); + BigDecimal currentYearAverage = BigDecimal.valueOf((yearTotal == 0) ? 0.0 : yearFinishCount / yearTotal) + .setScale(2, RoundingMode.HALF_UP); + result.put("yearTotal", currentYearMap.get("total")); + result.put("yearFinishCount", currentYearMap.get("finishCount")); + result.put("yearRate", currentYearAverage); - double averageDisposalRate = mapList.stream() - .mapToDouble(map -> { - double total = Double.parseDouble(String.valueOf(map.get("total"))); - double finishCount = Double.parseDouble(String.valueOf(map.get("finishCount"))); - return (total == 0) ? 0.0 : finishCount / total; - }) - .average() // 计算平均值 - .orElse(0.0); + //本月处理率 本月预警总数 本月处理个数 + Map monthMap = baseMapper.countMonthAlert(businessAlertBo); // 四舍五入保留两位小数 - BigDecimal roundedAverage = BigDecimal.valueOf(averageDisposalRate) + double monthTotal = Double.parseDouble(String.valueOf(monthMap.get("total"))); + double monthFinishCount = Double.parseDouble(String.valueOf(monthMap.get("finishCount"))); + BigDecimal currentMonthAverage = BigDecimal.valueOf((monthTotal == 0) ? 0.0 : monthFinishCount / monthTotal) .setScale(2, RoundingMode.HALF_UP); - return roundedAverage.doubleValue(); + + result.put("monthTotal", monthMap.get("total")); + result.put("monthFinishCount", monthMap.get("finishCount")); + result.put("monthRate", currentMonthAverage); + + return result; + } + + /** + * 今日情况 + * @param businessAlertBo + * @return + */ + @Override + public Map countCurrentDayAlert(BusinessAlertBo businessAlertBo) { + Map result = new HashMap<>(); + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + + Map dayMap = this.baseMapper.countCurrentDayAlert(businessAlertBo); + + result.put("dayTotal", dayMap.get("total")); + result.put("dayFinishCount", dayMap.get("finishCount")); + result.put("dayTodo", dayMap.get("todoCount")); + return result; + } + + /** + * 各局今日预警数量 + * @param businessAlertBo + * @return + */ + @Override + public Map countPostDayAlert(BusinessAlertBo businessAlertBo) { + Map result = new HashMap<>(); + //查询所有的职能岗位 + List postVoList = remotePostService.listPost(); + + if (ObjectUtil.isEmpty(postVoList)) { + return Map.of(); + } + + postVoList.forEach(postVo -> { + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + Map dayMap = this.baseMapper.countCurrentDayAlert(businessAlertBo); + result.put(postVo.getPostName(), dayMap.get("total")); + }); + + return result; + } + + /** + * 各局处理情况 + * @param businessAlertBo + * @return + */ + @Override + public List countPostAlert(BusinessAlertBo businessAlertBo) { + //查询所有的职能岗位 + List postVoList = remotePostService.listPost(); + + List result = new ArrayList<>(); + + if (ObjectUtil.isEmpty(postVoList)) { + return ListUtil.empty(); + } + + postVoList.forEach(postVo -> { + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + Map dateMap = switch (businessAlertBo.getDateType()) { + case 1 -> baseMapper.countCurrentAlert(businessAlertBo); + case 2 -> baseMapper.countMonthAlert(businessAlertBo); + case 3 -> baseMapper.countCurrentDayAlert(businessAlertBo); + default -> baseMapper.countCurrentAlert(businessAlertBo); + }; + + result.add(new StatObj( + postVo.getPostName(), + List.of( + new StatObj("todoCount", dateMap.get("todoCount")), + new StatObj("finishCount", dateMap.get("finishCount")) + ) + )); + }); + + return result; } private List buildDateList(BusinessAlertBo businessAlertBo) { diff --git a/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml b/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml index 7d1a8cb..353353d 100644 --- a/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml +++ b/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml @@ -387,4 +387,45 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" ORDER BY md.dept_id, md.dateMonth + + + + + + + + diff --git a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java new file mode 100644 index 0000000..7c2795c --- /dev/null +++ b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java @@ -0,0 +1,28 @@ +package org.dromara.system.dubbo; + +import lombok.RequiredArgsConstructor; +import org.apache.dubbo.config.annotation.DubboService; +import org.dromara.common.core.utils.MapstructUtils; +import org.dromara.system.api.RemotePostService; +import org.dromara.system.api.domain.vo.RemotePostVo; +import org.dromara.system.domain.bo.SysPostBo; +import org.dromara.system.domain.vo.SysPostVo; +import org.dromara.system.service.ISysPostService; +import org.springframework.stereotype.Service; + +import java.util.List; + +@RequiredArgsConstructor +@Service +@DubboService +public class RemotePostServiceImpl implements RemotePostService { + + + private final ISysPostService sysPostService; + + @Override + public List listPost() { + List sysPostVos = sysPostService.selectPostList(new SysPostBo()); + return MapstructUtils.convert(sysPostVos, RemotePostVo.class); + } +} diff --git a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java index a2e3fb1..f52887a 100644 --- a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java +++ b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java @@ -25,8 +25,8 @@ public class FlowDepart { /** * 流程类型 */ - @NotNull(message = "流程类型不能为空", groups = AddGroup.class) - private String flowType; +// @NotNull(message = "流程类型不能为空", groups = AddGroup.class) +// private String flowType; /** * 流程编码 diff --git a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java index aef7573..a235d22 100644 --- a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java +++ b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java @@ -7,6 +7,7 @@ import org.dromara.workflow.common.constant.FlowConstant; import java.io.Serial; import java.io.Serializable; import java.util.Date; +import java.util.List; /** * 流程定义视图 @@ -101,4 +102,6 @@ public class FlowDefinitionVo implements Serializable { * 扩展字段,预留给业务系统使用 */ private String ext; + + private List deptIds; } diff --git a/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java b/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java index b4ebcdf..08901e9 100644 --- a/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java +++ b/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.ObjectUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -31,8 +32,10 @@ import org.dromara.warm.flow.orm.mapper.FlowSkipMapper; import org.dromara.workflow.common.ConditionalOnEnable; import org.dromara.workflow.common.constant.FlowConstant; import org.dromara.workflow.domain.FlowCategory; +import org.dromara.workflow.domain.FlowDepart; import org.dromara.workflow.domain.vo.FlowDefinitionVo; import org.dromara.workflow.mapper.FlwCategoryMapper; +import org.dromara.workflow.service.FlwDepartService; import org.dromara.workflow.service.IFlwDefinitionService; import org.dromara.workflow.utils.WorkflowUtils; import org.springframework.stereotype.Service; @@ -64,6 +67,7 @@ public class FlwDefinitionServiceImpl implements IFlwDefinitionService { private final FlowNodeMapper flowNodeMapper; private final FlowSkipMapper flowSkipMapper; private final FlwCategoryMapper flwCategoryMapper; + private final FlwDepartService flwDepartService; /** * 查询流程定义列表 @@ -80,6 +84,12 @@ public class FlwDefinitionServiceImpl implements IFlwDefinitionService { TableDataInfo build = TableDataInfo.build(); build.setRows(BeanUtil.copyToList(page.getRecords(), FlowDefinitionVo.class)); build.setTotal(page.getTotal()); + build.getRows().forEach(p ->{ + FlowDepart info = flwDepartService.getInfo(p.getFlowCode()); + if(ObjectUtil.isNotEmpty(info)){ + p.setDeptIds(info.getDepartIds()); + } + }); return build; } diff --git a/dk-visual/pom.xml b/dk-visual/pom.xml index 1cb541d..c0b0751 100644 --- a/dk-visual/pom.xml +++ b/dk-visual/pom.xml @@ -14,6 +14,7 @@ seata-server nacos snailjob-server + rocketmq dk-visual diff --git a/dk-visual/rocketmq/pom.xml b/dk-visual/rocketmq/pom.xml new file mode 100644 index 0000000..9252d69 --- /dev/null +++ b/dk-visual/rocketmq/pom.xml @@ -0,0 +1,85 @@ + + + + org.dromara + dk-visual + ${revision} + + 4.0.0 + + rocketmq + + + + + org.apache.rocketmq + rocketmq-client + 4.9.0 + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + com.alibaba.nacos + nacos-client + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + com.mysql + mysql-connector-j + + + + org.projectlombok + lombok + + + + org.dromara + common-web + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java new file mode 100644 index 0000000..b45609c --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java @@ -0,0 +1,21 @@ +package com.ruoyi.rocketmq; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; + +/** + * RocketMQ模块 + * + * @author + */ +@SpringBootApplication +@EnableAsync +public class RocketMQApplication +{ + public static void main(String[] args) + { + SpringApplication.run(RocketMQApplication.class, args); + System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java new file mode 100644 index 0000000..e086fa2 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java @@ -0,0 +1,75 @@ +package com.ruoyi.rocketmq.config; + +import com.ruoyi.rocketmq.enums.MessageCodeEnum; +import com.ruoyi.rocketmq.enums.MessageTopic; +import com.ruoyi.rocketmq.model.ConsumerMode; +import com.ruoyi.rocketmq.consumer.RocketMsgListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +/** + * 消费者配置 + */ +@RefreshScope +@Configuration +@Slf4j +public class ConsumerConfig { + + @Autowired + private ConsumerMode consumerMode; + + @Bean + public DefaultMQPushConsumer getRocketMQConsumer() { + //构建客户端连接 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); + // + consumer.setNamesrvAddr(consumerMode.getNameServer()); + consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); + consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); + consumer.registerMessageListener(new RocketMsgListener()); + /** + * 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 + * 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 + * 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 + * 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 + */ + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + /** + * CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 + * 订阅topic整体,从而达到负载均衡的目的 + * BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 + * 需要注意的是,在广播模式下,每个Consumer都会独立地处理相同的消息副本。这可能会导致一些潜在的问题,例如消息重复处理或者资源浪费。因此,在使用广播模式时,请确保消息的处理逻辑是幂等的,并仔细考虑系统资源的消耗。 + */ + // consumer.setMessageModel(MessageModel.BROADCASTING); + + consumer.setVipChannelEnabled(false); + consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); + try { + /** + * 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 + * 由于官方并没有给直接订阅全部消息示例 所以使用list列表循环订阅所有topic + */ + // 获取所有topic列表 + MessageTopic messageTopic = new MessageTopic(); + List allTopics = messageTopic.RocketMQTopicList(); + //订阅所有topic + for (String topic : allTopics) { + consumer.subscribe(topic,"*"); + } + consumer.start(); + log.info("消费者初始化成功:{}", consumer); + } catch (MQClientException e) { + e.printStackTrace(); + log.error("消费者初始化失败:{}",e.getMessage()); + } + return consumer; + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java new file mode 100644 index 0000000..8d5ed48 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java @@ -0,0 +1,56 @@ +package com.ruoyi.rocketmq.config; + +import com.ruoyi.rocketmq.model.ProducerMode; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * mq搭建地址连接 + * 生产者初者连接信息 具体看nacos配置 + */ +@Configuration +@Slf4j +public class ProducerConfig { + + /** + * 远程调用连接信息 + */ + public static DefaultMQProducer producer; + + /** + * 连接客户端信息配置 具体看nacos配置 + */ + @Autowired + private ProducerMode producerMode; + + @Bean + public DefaultMQProducer getRocketMQProducer() { + producer = new DefaultMQProducer(producerMode.getGroupName()); + producer.setNamesrvAddr(producerMode.getNameServer()); + //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName + if(producerMode.getMaxMessageSize()!=null){ + producer.setMaxMessageSize(producerMode.getMaxMessageSize()); + } + if(producerMode.getSendMsgTimeout()!=null){ + producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); + } + //如果发送消息失败,设置重试次数,默认为2次 + if(producerMode.getRetryTimesWhenSendFailed()!=null){ + producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); + } + producer.setVipChannelEnabled(false); + try { + producer.start(); + log.info("生产者初始化成功:{}",producer.toString()); + } catch (MQClientException e) { + log.error("生产者初始化失败:{}",e.getMessage()); + } + return producer; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java new file mode 100644 index 0000000..153a2f2 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java @@ -0,0 +1,87 @@ +package com.ruoyi.rocketmq.consumer; + +import com.ruoyi.rocketmq.enums.MessageCodeEnum; +import com.ruoyi.rocketmq.producer.ConsumeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.UnsupportedEncodingException; +import java.util.List; + +/** + * 消息监听 + */ +@Slf4j +@Component +public class RocketMsgListener implements MessageListenerConcurrently { + + /** + * 消费消息 + * @param list msgs.size() >= 1 + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 + * @param consumeConcurrentlyContext 上下文信息 + * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + try{ + //消息不等于空情况 + if (!CollectionUtils.isEmpty(list)) { + //获取topic + for (MessageExt messageExt : list) { + // 解析消息内容 + String body = new String(messageExt.getBody()); + log.info("接受到的消息为:{}", body); + String tags = messageExt.getTags(); + String topic = messageExt.getTopic(); + String msgId = messageExt.getMsgId(); + String keys = messageExt.getKeys(); + int reConsume = messageExt.getReconsumeTimes(); + // 消息已经重试了3次,如果不需要再次消费,则返回成功 + if (reConsume == 3) { + // TODO 补偿信息 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常 + } + // 根据不同的topic处理不同的业务 这里以订单消息为例子 + if (MessageCodeEnum.ORDER_MESSAGE_TOPIC.getCode().equals(topic)) { + if (MessageCodeEnum.ORDER_MESSAGE_TAG.getCode().equals(tags)) { + //处理你的业务 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 + } else { + log.info("未匹配到Tag【{}】" + tags); + } + } + } + } + // 消息消费失败 + //broker会根据设置的messageDelayLevel发起重试,默认16次 + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } catch (Exception e) { + // 调用 handleException 方法处理异常并返回处理结果 + return handleException(e); + } + } + + /** + * 异常处理 + * + * @param e 捕获的异常 + * @return 消息消费结果 + */ + private static ConsumeConcurrentlyStatus handleException(final Exception e) { + Class exceptionClass = e.getClass(); + if (exceptionClass.equals(UnsupportedEncodingException.class)) { + log.error(e.getMessage()); + } else if (exceptionClass.equals(ConsumeException.class)) { + log.error(e.getMessage()); + } + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java new file mode 100644 index 0000000..f2407f5 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java @@ -0,0 +1,123 @@ +package com.ruoyi.rocketmq.controller; + + +import com.ruoyi.rocketmq.producer.MessageProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 消息测试类Controller + */ +@RestController +@RequestMapping("/api/rocketMessage") +public class RocketMqController { + + + + /** + * 发送同步消息 + */ + @PostMapping("/sendSynchronizeMessage") + private Map sendSynchronizeMessage(){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 + SendResult sendResult = messageProducer.sendSynchronizeMessage("order-message","order_message_tag","title","content"); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + + + /** + * 发送单向消息 + */ + @PostMapping("/sendOnewayMessage") + private Map sendOnewayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + messageProducer.sendOnewayMessage("order-message","order_timeout_tag","title","content"); + Map result = new HashMap<>(); + result.put("msg","发送成功"); + result.put("code",200); + return result; + } + + + /** + * 批量发送消息 + */ + @PostMapping("/sendBatchMessage") + private Map sendBatchMessage(){ + // 根据实际需求创建消息列表并返回 + List messages = new ArrayList<>(); + // 添加消息到列表 + messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendBatchMessage(messages); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + + /** + * 发送有序的消息 + */ + @PostMapping("/sendOrderlyMessage") + private Map sendOrderlyMessage(){ + // 根据实际需求创建消息列表并返回 + List messages = new ArrayList<>(); + // 添加消息到列表 + messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); + Integer messageQueueNumber = 3; + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendOrderlyMessage(messages,messageQueueNumber); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + /** + * 发送延迟消息 + */ + @PostMapping("/sendDelayMessage") + private Map sendDelayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendDelayMessage("order-message","order_timeout_tag","title","content"); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + + /** + * 发送异步的消息 + */ + @PostMapping("/sendAsyncProducerMessage") + private Map sendAsyncProducerMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendAsyncProducerMessage("order-message","order_timeout_tag","title","content"); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java new file mode 100644 index 0000000..3e038ec --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java @@ -0,0 +1,55 @@ +package com.ruoyi.rocketmq.enums; + + +import lombok.Getter; + +/** + * 用于传递topic和 tag + * 也用于接收消息后判断不同的消息处理不同的业务 + */ +@Getter +public enum MessageCodeEnum { + + /** + * 系统消息 + */ + NOTE_MESSAGE_TOPIC("system-message","系统消息服务模块topic名称"), + /** + * 用户消息 + */ + USER_MESSAGE_TOPIC("user-message","用户消息服务模块topic名称"), + + /** + * 订单消息 + */ + ORDER_MESSAGE_TOPIC("order-message","订单消息服务模块topic名称"), + + /** + * 用户消息tag + */ + USER_MESSAGE_TAG("user_message_tag","用户消息推送"), + + /** + * 系统消息tag + */ + NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), + + /** + * 订单消息 + */ + ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), + + /** + * 订单处理编号 + */ + ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); + + private final String code; + private final String msg; + + MessageCodeEnum(String code, String msg){ + this.code = code; + this.msg = msg; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java new file mode 100644 index 0000000..fe1ea39 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java @@ -0,0 +1,24 @@ +package com.ruoyi.rocketmq.enums; + + +import java.util.ArrayList; +import java.util.List; + +/** + * 定义topic列表 + */ +public class MessageTopic { + + //在这里添加topic 用于批量订阅 + public List RocketMQTopicList(){ + List getTopicLists=new ArrayList<>(); + //系统消息 + getTopicLists.add("system-message"); + //用户消息 + getTopicLists.add("user-message"); + //订单消息 + getTopicLists.add("order-message"); + return getTopicLists; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java new file mode 100644 index 0000000..b76cab1 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java @@ -0,0 +1,26 @@ +package com.ruoyi.rocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +/** + * 消费者初始化 + * 消费者连接信息 具体看nacos配置 + */ +@Data +@Configuration +@Component +public class ConsumerMode { + @Value("${rocketmq.nameServer}") + private String nameServer; + @Value("${rocketmq.consumer.groupName}") + private String groupName ; + @Value("${rocketmq.consumer.consumerThreadMin}") + private Integer consumeThreadMin; + @Value("${rocketmq.consumer.consumerThreadMax}") + private Integer consumeThreadMax; + @Value("${rocketmq.consumer.consumerMessageBatchMaxSize}") + private Integer consumeMessageBatchMaxSize; +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java new file mode 100644 index 0000000..f605f57 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java @@ -0,0 +1,25 @@ +package com.ruoyi.rocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; + +/** + * 生产者初始化 + */ +@RefreshScope +@Data +@Configuration +public class ProducerMode { + @Value("${rocketmq.producer.groupName}") + private String groupName; + @Value("${rocketmq.nameServer}") + private String nameServer; + @Value("${rocketmq.producer.maxMessageSize}") + private Integer maxMessageSize; + @Value("${rocketmq.producer.sendMsgTimeout}") + private Integer sendMsgTimeout; + @Value("${rocketmq.producer.retryTimesWhenSendFailed}") + private Integer retryTimesWhenSendFailed; +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java new file mode 100644 index 0000000..825f900 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java @@ -0,0 +1,23 @@ +package com.ruoyi.rocketmq.producer; + +/** + * @author 影子 + * 用于捕捉异常非受检异常(unchecked exception) + * RuntimeException 和其子类的异常在编译时不需要进行强制性的异常处理,可以选择在运行时进行捕获和处理 + * 可选择使用 + */ +public class ConsumeException extends RuntimeException{ + private static final long serialVersionUID = 4093867789628938836L; + + public ConsumeException(String message) { + super(message); + } + + public ConsumeException(Throwable cause) { + super(cause); + } + + public ConsumeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java new file mode 100644 index 0000000..2bb9057 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java @@ -0,0 +1,190 @@ +package com.ruoyi.rocketmq.producer; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.producer.TransactionSendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static com.ruoyi.rocketmq.config.ProducerConfig.producer; + +/** + * 消息发送 + */ +@Slf4j +public class MessageProducer { + + + /** + * 同步发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 通过调用 send() 方法发送消息,阻塞等待服务器响应。 + */ + public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + SendResult result = producer.send(msg); + return result; + } catch (Exception e) { + e.printStackTrace(); + log.error("消息初始化失败!body:{}",body); + + } + return null; + } + + /** + * 单向发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。 + */ + public void sendOnewayMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + producer.sendOneway(msg); + } catch (UnsupportedEncodingException e) { + log.error("消息初始化失败!body:{}",body); + + } catch (MQClientException | InterruptedException | RemotingException e) { + log.error("消息发送失败! body:{}",body); + } + } + + + /** + * 批量发送消息 + * @param messages 消息列表 + * 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。 + */ + public SendResult sendBatchMessage(List messages){ + String body = messages.toString(); + try { + System.out.println("生产者发送消息:"+ messages); + // 发送批量消息 + SendResult sendResult = producer.send(messages); + return sendResult; + } catch (MQClientException | InterruptedException | RemotingException e) { + log.error("消息发送失败! body:{}",body); + } catch (MQBrokerException e) { + throw new RuntimeException(e); + } + return null; + } + + + /** + * 发送有序的消息 + * @param messagesList Message集合 + * @param messageQueueNumber 消息队列数量,根据实际情况设定 + * 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。 + */ + public SendResult sendOrderlyMessage(List messagesList, Integer messageQueueNumber) { + SendResult result = null; + for (Message message : messagesList) { + try { + result = producer.send(message, (list, msg, arg) -> { + Integer queueNumber = (Integer) arg; + //int queueIndex = queueNumber % list.size(); + return list.get(queueNumber); + }, messageQueueNumber);//根据编号取模,选择消息队列 + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + log.error("发送有序消息失败"); + return result; + } + } + return result; + } + + /** + * 发送延迟消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 延迟发送:通过设置延迟级别来实现延迟发送消息。 + */ + public SendResult sendDelayMessage(String topic, String tag, String key, String value) + { + SendResult result = null; + try + { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + //设置消息延迟级别,我这里设置5,对应就是延时一分钟 + // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" + msg.setDelayTimeLevel(4); + // 发送消息到一个Broker + result = producer.send(msg); + // 通过sendResult返回消息是否成功送达 + log.info("发送延迟消息结果:======sendResult:{}", result); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("发送时间:{}", format.format(new Date())); + return result; + } + catch (Exception e) + { + e.printStackTrace(); + log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); + } + return result; + } + /** + * 发送异步的消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。 + */ + public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){ + + try { + //创建一个消息实例,指定主题、标签和消息体。 + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + producer.send(msg,new SendCallback() { + // 异步回调的处理 + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); + } + @Override + public void onException(Throwable e) { + System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); + e.printStackTrace(); + } + }); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return null; + } + +} diff --git a/dk-visual/rocketmq/src/main/resources/application.yml b/dk-visual/rocketmq/src/main/resources/application.yml new file mode 100644 index 0000000..c1758d8 --- /dev/null +++ b/dk-visual/rocketmq/src/main/resources/application.yml @@ -0,0 +1,59 @@ +server: + port: 9402 + +# Spring +spring: + application: + # 应用名称 + name: rocketmq + profiles: + # 环境配置 + active: @profiles.active@ + +--- # rocketmq 配置 +rocketmq: + # 生产者的组名 + producer: + #是否开启自动配置 +# isEnable: true + # 发送同一类消息的设置为同一个group,保证唯一 + groupName: message-producer + # 消息最大长度 默认1024*4(4M) + maxMessageSize: 4096 + # 发送消息超时时间,默认3000 + sendMsgTimeout: 3000 + # 发送消息失败重试次数,默认2 + retryTimesWhenSendFailed: 3 +# group: produce-group + # 消费者的组名 + consumer: + #是否开启自动配置 +# isEnable: true + # 官方建议:确保同一组中的每个消费者订阅相同的主题。 + groupName: message-consumer + consumerThreadMin: 20 + consumerThreadMax: 64 + # 设置一次消费消息的条数,默认为1条 + consumerMessageBatchMaxSize: 1 + # NameServer地址 + nameServer: 114.235.183.147:9876 + +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml diff --git a/dk-visual/rocketmq/src/main/resources/banner.txt b/dk-visual/rocketmq/src/main/resources/banner.txt new file mode 100644 index 0000000..d826e58 --- /dev/null +++ b/dk-visual/rocketmq/src/main/resources/banner.txt @@ -0,0 +1,6 @@ + ___ _ _ __ __ ___ + | _ \ ___ __ | |__ ___ | |_ | \/ | / _ \ + | / / _ \ / _| | / / / -_) | _| | |\/| || (_) | + |_|_\ \___/ \__|_ |_\_\ \___| _\__| |_|__|_| \__\_\ +_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| +"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-' diff --git a/dk-visual/rocketmq/src/main/resources/logback-plus.xml b/dk-visual/rocketmq/src/main/resources/logback-plus.xml new file mode 100644 index 0000000..caaa345 --- /dev/null +++ b/dk-visual/rocketmq/src/main/resources/logback-plus.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + ${console.log.pattern} + utf-8 + + + + + + + + + + + + + + + diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java new file mode 100644 index 0000000..4ec745a --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java @@ -0,0 +1,25 @@ +package com.ruoyi.testrocketmq; + +import com.ruoyi.common.security.annotation.EnableCustomConfig; +import com.ruoyi.common.security.annotation.EnableRyFeignClients; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; + +/** + * 平台管理模块 + * + * @author ruoyi + */ +@EnableCustomConfig +@EnableRyFeignClients +@SpringBootApplication +@EnableAsync +public class RocketMQApplication +{ + public static void main(String[] args) + { + SpringApplication.run(RocketMQApplication.class, args); + System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java new file mode 100644 index 0000000..38ebbae --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java @@ -0,0 +1,64 @@ +package com.ruoyi.testrocketmq.config; + +import com.ruoyi.testrocketmq.consumer.RocketMsgListener; +import com.ruoyi.testrocketmq.enums.MessageCodeEnum; +import com.ruoyi.testrocketmq.model.ConsumerMode; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 消费者配置 + */ +@RefreshScope +@Configuration +@Slf4j +public class ConsumerConfig { + @Autowired + private ConsumerMode consumerMode; + + @Bean + public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { +// ConsumerMode consumerMode = new ConsumerMode(); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); + consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); + consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); + consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); + consumer.registerMessageListener(new RocketMsgListener()); + /** + * 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 + * 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 + * 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 + * 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 + */ + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + /** + * CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 + * 订阅topic整体,从而达到负载均衡的目的 + * BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 + * + */ + // consumer.setMessageModel(MessageModel.BROADCASTING); + + consumer.setVipChannelEnabled(false); + consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); + try { + /** + * 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 + */ + consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); + consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); + consumer.start(); + log.info("消费者初始化成功:{}", consumer.toString()); + } catch (MQClientException e) { + e.printStackTrace(); + log.error("消费者初始化失败:{}",e.getMessage()); + } + return consumer; + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java new file mode 100644 index 0000000..1d4b56e --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java @@ -0,0 +1,27 @@ +package com.ruoyi.testrocketmq.config; + +/** + * @author yz + */ +public class MessageConfig { + private Class messageClass; + private boolean orderlyMessage; + + public Class getMessageClass() { + return messageClass; + } + + public void setMessageClass(Class messageClass) { + this.messageClass = messageClass; + } + + public boolean isOrderlyMessage() { + return orderlyMessage; + } + + public void setOrderlyMessage(boolean orderlyMessage) { + this.orderlyMessage = orderlyMessage; + } + + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java new file mode 100644 index 0000000..b55917e --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java @@ -0,0 +1,48 @@ +package com.ruoyi.testrocketmq.config; + +import com.ruoyi.testrocketmq.model.ProducerMode; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +@Slf4j +public class ProducerConfig { + + public static DefaultMQProducer producer; + + @Autowired + private ProducerMode producerMode; + + + + @Bean + public DefaultMQProducer getRocketMQProducer() { + producer = new DefaultMQProducer(producerMode.getGroupName()); + producer.setNamesrvAddr(producerMode.getNamesrvAddr()); + //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName + if(producerMode.getMaxMessageSize()!=null){ + producer.setMaxMessageSize(producerMode.getMaxMessageSize()); + } + if(producerMode.getSendMsgTimeout()!=null){ + producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); + } + //如果发送消息失败,设置重试次数,默认为2次 + if(producerMode.getRetryTimesWhenSendFailed()!=null){ + producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); + } + producer.setVipChannelEnabled(false); + try { + producer.start(); + log.info("生产者初始化成功:{}",producer.toString()); + } catch (MQClientException e) { + log.error("生产者初始化失败:{}",e.getMessage()); + } + return producer; + } + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java new file mode 100644 index 0000000..88d2b05 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java @@ -0,0 +1,100 @@ +package com.ruoyi.testrocketmq.consumer; + +import com.ruoyi.testrocketmq.enums.MessageCodeEnum; +import com.ruoyi.testrocketmq.producer.ConsumeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +/** + * 消息监听 + */ +@Slf4j +@Component +public class RocketMsgListener implements MessageListenerConcurrently { + + /** + * 消费消息 + * + * @param list msgs.size() >= 1 + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 + * @param consumeConcurrentlyContext 上下文信息 + * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + + if (!CollectionUtils.isEmpty(list)) { + for (MessageExt messageExt : list) { + // 消息内容 + String body = new String(messageExt.getBody()); + log.info("接受到的消息为:{}", body); + String tags = messageExt.getTags(); + String topic = messageExt.getTopic(); + String msgId = messageExt.getMsgId(); + String keys = messageExt.getKeys(); + int reConsume = messageExt.getReconsumeTimes(); + // 消息已经重试了3次,如果不需要再次消费,则返回成功 + if (reConsume == 3) { + // TODO 补偿信息 + //smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败"); + log.error("消息重试超过3次,消费失败!"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + // 订单超时处理 + if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { + if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { +// //获取订单 +// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys); +// if (dealUserOrder != null) { +// //订单状态超时未支付关闭订单 处理 +// if (dealUserOrder.getStatus().equals("1")) { +// DealUserOrder dealUserOrders = new DealUserOrder(); +// dealUserOrders.setOrderId(dealUserOrder.getOrderId()); +// dealUserOrders.setStatus("4"); +// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders); +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; +// } +// log.info("Order does not exist."); +// } + log.info("Consumption success:" + body); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("Consumption time:{}", format.format(new Date())); + } else { + log.info("未匹配到Tag【{}】" + tags); + } + } + } + } + // 消息消费成功 + //ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + /** + * 异常处理 + * + * @param e 捕获的异常 + * @return 消息消费结果 + */ + private static ConsumeConcurrentlyStatus handleException(final Exception e) { + Class exceptionClass = e.getClass(); + if (exceptionClass.equals(UnsupportedEncodingException.class)) { + log.error(e.getMessage()); + } else if (exceptionClass.equals(ConsumeException.class)) { + log.error(e.getMessage()); + } + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java new file mode 100644 index 0000000..e6ff8c4 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java @@ -0,0 +1,60 @@ +package com.ruoyi.testrocketmq.enums; + + +import lombok.Getter; + +@Getter +public enum MessageCodeEnum { + /** + * 消息模块主题 + */ + MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), + /** + * 系统消息 + */ + NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), + /** + * 用户消息 + */ + USER_MESSAGE("user-message","用户消息服务模块topic名称"), + + /** + * 订单消息 + */ + ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), + + /** + * 平台编号 + */ + USER_MESSAGE_TAG("user_message_tag","用户消息推送"), + NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), + ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), + + /** + * 订单处理编号 + */ + //订单超时处理 + ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); + + + private final String code; + private final String msg; + + MessageCodeEnum(String code, String msg){ + this.code = code; + this.msg = msg; + } + + public static String valuesOfType(String code) { + String value = ""; + for (MessageCodeEnum e : MessageCodeEnum.values()) { + if (code.equals(e.code)) { + value = e.msg; + } + + } + return value; + } + + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java new file mode 100644 index 0000000..e66738a --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java @@ -0,0 +1,22 @@ +package com.ruoyi.testrocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +@Data +@Configuration +@Component +public class ConsumerMode { + @Value("${suning.rocketmq.namesrvAddr}") + private String namesrvAddr; + @Value("${suning.rocketmq.conumer.groupName}") + private String groupName ; + @Value("${suning.rocketmq.conumer.consumeThreadMin}") + private int consumeThreadMin; + @Value("${suning.rocketmq.conumer.consumeThreadMax}") + private int consumeThreadMax; + @Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") + private int consumeMessageBatchMaxSize; +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java new file mode 100644 index 0000000..0cd4060 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java @@ -0,0 +1,25 @@ +package com.ruoyi.testrocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; + +/** + * 生产者初始化 + */ +@RefreshScope +@Data +@Configuration +public class ProducerMode { + @Value("${suning.rocketmq.producer.groupName}") + private String groupName; + @Value("${suning.rocketmq.namesrvAddr}") + private String namesrvAddr; + @Value("${suning.rocketmq.producer.maxMessageSize}") + private Integer maxMessageSize; + @Value("${suning.rocketmq.producer.sendMsgTimeout}") + private Integer sendMsgTimeout; + @Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") + private Integer retryTimesWhenSendFailed; +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java new file mode 100644 index 0000000..86ed05c --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java @@ -0,0 +1,47 @@ +package com.ruoyi.testrocketmq.producer; + +import com.ruoyi.testrocketmq.config.ProducerConfig; +import org.springframework.beans.factory.annotation.Autowired; + +public class AsyncProducer { + + @Autowired + private ProducerConfig producerConfig; + + /** + * 发送异步的消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * @return org.apache.rocketmq.client.producer.SendResult + */ +// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException { +// +// try { +// DefaultMQProducer defaultMQProducer = producerConfig.producer; +// //Create a message instance, specifying topic, tag and message body. +// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET)); +// defaultMQProducer.send(msg, new SendCallback() { +// // 异步回调的处理 +// @Override +// public void onSuccess(SendResult sendResult) { +// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); +// } +// +// @Override +// public void onException(Throwable e) { +// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); +// e.printStackTrace(); +// } +// }); +// } catch (MQClientException e) { +// e.printStackTrace(); +// } catch (RemotingException e) { +// e.printStackTrace(); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// return null; +// } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java new file mode 100644 index 0000000..82d4314 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java @@ -0,0 +1,20 @@ +package com.ruoyi.testrocketmq.producer; + +/** + * @author 影子 + */ +public class ConsumeException extends RuntimeException{ + private static final long serialVersionUID = 4093867789628938836L; + + public ConsumeException(String message) { + super(message); + } + + public ConsumeException(Throwable cause) { + super(cause); + } + + public ConsumeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java new file mode 100644 index 0000000..fa6afbe --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java @@ -0,0 +1,34 @@ +package com.ruoyi.testrocketmq.producer; + + + +import lombok.Data; +import lombok.ToString; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * 消费时,当前所消费的消息的上下文信息 + * + * @author jolly + */ +@ToString +@Data +public final class MessageContext { + + /** + * 所消费消息所在的消息队列 + * + * @see MessageQueue + */ + private MessageQueue messageQueue; + + /** + * 所消费的消息的扩展属性 + * + * @see MessageExt + */ + private MessageExt messageExt; + + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java new file mode 100644 index 0000000..7450530 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java @@ -0,0 +1,110 @@ +package com.ruoyi.testrocketmq.producer; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static com.ruoyi.rocketmq.config.ProducerConfig.producer; + +/** + * 消息发送 + */ +@Slf4j +public class MessageProducer { + + + /** + * 同步发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * @return org.apache.rocketmq.client.producer.SendResult + */ + public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + SendResult result = producer.send(msg); + return result; + } catch (UnsupportedEncodingException e) { + log.error("消息初始化失败!body:{}",body); + + } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { + log.error("消息发送失败! body:{}",body); + } + return null; + } + + + + /** + * 发送有序的消息 + * @param messagesList Message集合 + * @param messageQueueNumber 消息队列编号 + * @return org.apache.rocketmq.client.producer.SendResult + */ + public SendResult sendOrderlyMessage(List messagesList, int messageQueueNumber) { + SendResult result = null; + for (Message message : messagesList) { + try { +// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message); +// System.out.println(defaultMQProducer); + result = producer.send(message, (list, msg, arg) -> { + Integer queueNumber = (Integer) arg; + return list.get(queueNumber); + }, messageQueueNumber); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + log.error("发送有序消息失败"); + return result; + } + } + return result; + } + + /** + * 推送延迟消息 + * @param topic + * @param tag + * @param key + * @return boolean + */ + public SendResult sendDelayMessage(String topic, String tag, String key, String value) + { + SendResult result = null; + try + { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + //设置消息延迟级别,我这里设置5,对应就是延时一分钟 + // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" + msg.setDelayTimeLevel(4); + // 发送消息到一个Broker + result = producer.send(msg); + // 通过sendResult返回消息是否成功送达 + log.info("发送延迟消息结果:======sendResult:{}", result); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("发送时间:{}", format.format(new Date())); + return result; + } + catch (Exception e) + { + e.printStackTrace(); + log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); + } + return result; + } + + +}