diff --git a/dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java b/dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java new file mode 100644 index 0000000..9d93117 --- /dev/null +++ b/dk-api/api-system/src/main/java/org/dromara/system/api/RemotePostService.java @@ -0,0 +1,10 @@ +package org.dromara.system.api; + +import org.dromara.system.api.domain.vo.RemotePostVo; + +import java.util.List; + +public interface RemotePostService { + List listPost(); + +} diff --git a/dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java b/dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java new file mode 100644 index 0000000..99f83ac --- /dev/null +++ b/dk-api/api-system/src/main/java/org/dromara/system/api/domain/vo/RemotePostVo.java @@ -0,0 +1,63 @@ +package org.dromara.system.api.domain.vo; + +import com.alibaba.excel.annotation.ExcelProperty; +import lombok.Data; +import org.dromara.common.excel.annotation.ExcelDictFormat; +import org.dromara.common.excel.convert.ExcelDictConvert; + +import java.io.Serial; +import java.util.Date; + +@Data +public class RemotePostVo { + + /** + * 岗位ID + */ + private Long postId; + + /** + * 部门id + */ + private Long deptId; + + /** + * 岗位编码 + */ + private String postCode; + + /** + * 岗位名称 + */ + private String postName; + + /** + * 岗位类别编码 + */ + private String postCategory; + + /** + * 显示顺序 + */ + private Integer postSort; + + /** + * 状态(0正常 1停用) + */ + private String status; + + /** + * 备注 + */ + private String remark; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 部门名 + */ + private String deptName; +} diff --git a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/TopicConst.java b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/TopicConst.java index d2ff422..c89a4d6 100644 --- a/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/TopicConst.java +++ b/dk-common/common-cloudsdk/src/main/java/org/dromara/common/sdk/mqtt/TopicConst.java @@ -39,4 +39,12 @@ public class TopicConst { public static final String UP = "/up"; public static final String DOWN = "/down"; + + public static final String IMAGE = "image/"; + + // 比对 + public static final String COMPARE = "compare"; + + //画框 + public static final String FRAME = "frame"; } diff --git a/dk-common/common-core/src/main/java/org/dromara/common/core/constant/DeviceQrtzConstants.java b/dk-common/common-core/src/main/java/org/dromara/common/core/constant/DeviceQrtzConstants.java new file mode 100644 index 0000000..567a291 --- /dev/null +++ b/dk-common/common-core/src/main/java/org/dromara/common/core/constant/DeviceQrtzConstants.java @@ -0,0 +1,26 @@ +package org.dromara.common.core.constant; + +/** + * @auther yq + * @data 2025/3/26 + */ +public interface DeviceQrtzConstants { + + /** + * 任务状态 状态(1-使用,2-暂停,0-异常暂停) + * */ + int QRTZ_STATUS_0=0; + int QRTZ_STATUS_1=1; + int QRTZ_STATUS_2=2; + + /** + * + * 任务航线状态 状态(1-暂未飞行,2-正在飞行,3-飞行结束,0-异常暂停) + * */ + + int QRTZ_FILE_STATUS_0=0; + int QRTZ_FILE_STATUS_1=1; + int QRTZ_FILE_STATUS_2=2; + int QRTZ_FILE_STATUS_3=3; + +} diff --git a/dk-example/test-mq/pom.xml b/dk-example/test-mq/pom.xml index 48bfc69..647a5eb 100644 --- a/dk-example/test-mq/pom.xml +++ b/dk-example/test-mq/pom.xml @@ -33,6 +33,19 @@ org.springframework.kafka spring-kafka + + org.apache.rocketmq + rocketmq-client + 4.9.0 + + + junit + junit + 4.13.2 + + + + org.dromara @@ -64,6 +77,10 @@ + + com.mysql + mysql-connector-j + diff --git a/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java b/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java new file mode 100644 index 0000000..e471892 --- /dev/null +++ b/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java @@ -0,0 +1,52 @@ +package org.dromara.stream.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; + +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author xbhog + * @date 2024/06/01 17:05 + **/ +@Slf4j +@Component +@RocketMQTransactionListener +public class RocketMQListener { + + public static void main(String[] args) throws Exception { + // 实例化消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); + + // 设置NameServer的地址 + consumer.setNamesrvAddr("192.168.110.96:9876"); + + // 订阅一个或多个Topic,以及Tag来过滤特定消息 + consumer.subscribe("SELF_TEST_TOPIC", "Tag"); + + // 注册回调实现类来处理从broker拉取回来的消息 + consumer.registerMessageListener(new MessageListenerOrderly() { + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + context.setAutoCommit(true); // 根据需要设置自动提交偏移量 + for (MessageExt msg : msgs) { + // 处理消息内容,例如打印出来或者进行其他业务逻辑处理 + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); + } + return ConsumeOrderlyStatus.SUCCESS; // 返回消费状态 + } + }); + + // 启动消费者实例 + consumer.start(); +// System.out.printf("Consumer Started.%n"); + } + +} diff --git a/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java b/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java new file mode 100644 index 0000000..7e794f0 --- /dev/null +++ b/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java @@ -0,0 +1,49 @@ +package org.dromara.stream.producer; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * @author xbhog + * @date 2024/05/25 17:15 + **/ +@Slf4j +@Component +public class RocketmqProducer { + + public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { + // 初始化消息生产者 + DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); + // 设置超时时间 + producer.setSendMsgTimeout(10000); + // 指定nameserver地址 + producer.setNamesrvAddr("192.168.110.96:9876"); + + producer.start(); + for (int i = 0; i < 100; i++) { + // 创建消息,并指定Topic,Tag和消息体 + Message msg = new Message("SELF_TEST_TOPIC" /* Topic */, "Tag" /* Tag */, ("Hello RocketMQ " + i).getBytes()); + // 发送消息到一个Broker + producer.send(msg); + } + + // 如果不再发送消息,关闭Producer实例。 + producer.shutdown(); + } + +} + + + diff --git a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java index b9860f2..beb4461 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java +++ b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertController.java @@ -1,5 +1,6 @@ package org.dromara.business.controller; +import cn.dev33.satoken.annotation.SaCheckPermission; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; diff --git a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java index 458d8e8..c5b490e 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java +++ b/dk-modules/business/src/main/java/org/dromara/business/controller/BusinessAlertStatisticsController.java @@ -26,11 +26,6 @@ public class BusinessAlertStatisticsController extends BaseController { private final IBusinessAlertStatisticsService statisticsService; - @Operation(summary="获取预警处置率", description="获取预警处置率") - @GetMapping(value = "/rate") - public R alertRate(BusinessAlertBo businessAlertBo) { - return R.ok(statisticsService.alertRate(businessAlertBo)); - } @Operation(summary="panel看板数据", description="panel看板数据") @GetMapping(value = "/panel/count") @@ -38,6 +33,57 @@ public class BusinessAlertStatisticsController extends BaseController { return R.ok(statisticsService.countPanelAlert(businessAlertBo)); } + /** + * 总体情况看板 + * @param businessAlertBo + * @return + */ + @Operation(summary="总体情况看板", description="总体情况看板") + @GetMapping(value = "/total/panel/count") + public R> countPanelTotalAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countPanelTotalAlert(businessAlertBo)); + } + + + /** + * 今日预警情况 + * @param businessAlertBo + * @return + */ + @Operation(summary="今日预警情况", description="今日预警情况") + @GetMapping(value = "/current/day/count") + public R> countCurrentDayAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countCurrentDayAlert(businessAlertBo)); + } + + /** + * 各局今日预警数 + * @param businessAlertBo + * @return + */ + @Operation(summary="各局今日预警数", description="各局今日预警数") + @GetMapping(value = "/post/day/count") + public R> countPostDayAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countPostDayAlert(businessAlertBo)); + } + + + /** + * 各局处理情况 + * @param businessAlertBo + * @return + */ + @Operation(summary="各局处理情况", description="各局处理情况") + @GetMapping(value = "/post/count") + public R> countPostAlert(BusinessAlertBo businessAlertBo) { + return R.ok(statisticsService.countPostAlert(businessAlertBo)); + } + + + + + + //饼图显示每个月根据部门 @Operation(summary="根据月份显示预警个数", description="根据月份显示预警个数") @GetMapping(value = "/month/count") diff --git a/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java b/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java index 6f48e94..143dcba 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java +++ b/dk-modules/business/src/main/java/org/dromara/business/domain/bo/BusinessAlertBo.java @@ -249,4 +249,9 @@ public class BusinessAlertBo { */ private List aiLabelEnList; + /** + * 时间搜索类型 1年,2月,3日 + */ + private Integer dateType = 1; + } diff --git a/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java b/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java index 2b12d26..58392b0 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java +++ b/dk-modules/business/src/main/java/org/dromara/business/mapper/BusinessAlertMapper.java @@ -49,34 +49,31 @@ public interface BusinessAlertMapper extends BaseMapperPlus pageBusinessAlertCancel(Page build, QueryWrapper ew); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> listMonthAlert(@Param("param") BusinessAlertBo businessAlertBo); List> listDepartAlert(@Param("param") BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) List> listMonthAlertStatus(@Param("param")BusinessAlertBo businessAlertBo); List> listDepartAlertStatus(@Param("param") BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> countAlertCompare(@Param("param")BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> countPanelAlert(@Param("param")BusinessAlertBo businessAlertBo); - @DataPermission( - @DataColumn(key = "deptName", value = "ba.dept_id") - ) + List> countAlertTypeCompare(@Param("param") BusinessAlertBo businessAlertBo); + + Integer countFromOldToCurrent(BusinessAlertBo businessAlertBo); + + Map countCurrentAlert(BusinessAlertBo businessAlertBo); + + Map countMonthAlert(BusinessAlertBo businessAlertBo); + + Map countCurrentDayAlert(BusinessAlertBo businessAlertBo); } diff --git a/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java b/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java index 5652a6b..95b6d9e 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java +++ b/dk-modules/business/src/main/java/org/dromara/business/service/IBusinessAlertStatisticsService.java @@ -22,6 +22,11 @@ public interface IBusinessAlertStatisticsService { List> countAlertTypeCompare(BusinessAlertBo businessAlertBo); - Double alertRate(BusinessAlertBo businessAlertBo); + Map countPanelTotalAlert(BusinessAlertBo businessAlertBo); + Map countCurrentDayAlert(BusinessAlertBo businessAlertBo); + + Map countPostDayAlert(BusinessAlertBo businessAlertBo); + + List countPostAlert(BusinessAlertBo businessAlertBo); } diff --git a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java index 7bb810f..c82409b 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java +++ b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertServiceImpl.java @@ -128,6 +128,8 @@ public class BusinessAlertServiceImpl implements IBusinessAlertService { wrapper.set(BusinessAlert::getAssignDate, new Date()); } + wrapper.set(BusinessAlert::getCompleteDate,new Date()); + wrapper.eq(BusinessAlert::getId, businessId); this.baseMapper.update(wrapper); diff --git a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java index 0df2d28..bcb4690 100644 --- a/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java +++ b/dk-modules/business/src/main/java/org/dromara/business/service/impl/BusinessAlertStatisticsServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.business.service.impl; +import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.ObjectUtil; import lombok.RequiredArgsConstructor; import org.apache.dubbo.config.annotation.DubboReference; @@ -10,8 +11,10 @@ import org.dromara.business.service.IBusinessAlertStatisticsService; import org.dromara.common.satoken.utils.LoginHelper; import org.dromara.system.api.RemoteDeptService; import org.dromara.system.api.RemoteLabelPostService; +import org.dromara.system.api.RemotePostService; import org.dromara.system.api.domain.vo.RemoteAiLabelPostVo; import org.dromara.system.api.domain.vo.RemoteDeptVo; +import org.dromara.system.api.domain.vo.RemotePostVo; import org.springframework.stereotype.Service; import java.math.BigDecimal; @@ -37,6 +40,8 @@ public class BusinessAlertStatisticsServiceImpl implements IBusinessAlertStatist @DubboReference RemoteLabelPostService remoteLabelPostService; + @DubboReference + RemotePostService remotePostService; /** * 按照月份分类预警数量(包含权限) @@ -142,23 +147,126 @@ public class BusinessAlertStatisticsServiceImpl implements IBusinessAlertStatist return baseMapper.countAlertTypeCompare(businessAlertBo); } + /** + * 总体情况 + * @param businessAlertBo + * @return + */ @Override - public Double alertRate(BusinessAlertBo businessAlertBo) { - List> mapList = baseMapper.countPanelAlert(businessAlertBo); + public Map countPanelTotalAlert(BusinessAlertBo businessAlertBo) { + Map result = new HashMap<>(); + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + //2024一直到当前年份处理预警个数 + Integer totalFinishCount = baseMapper.countFromOldToCurrent(businessAlertBo); + result.put("totalFinishCount", totalFinishCount); + //今年处理率 今年的预警总数 今年处理个数 + Map currentYearMap = baseMapper.countCurrentAlert(businessAlertBo); + // 四舍五入保留两位小数 + double yearTotal = Double.parseDouble(String.valueOf(currentYearMap.get("total"))); + double yearFinishCount = Double.parseDouble(String.valueOf(currentYearMap.get("finishCount"))); + BigDecimal currentYearAverage = BigDecimal.valueOf((yearTotal == 0) ? 0.0 : yearFinishCount / yearTotal) + .setScale(2, RoundingMode.HALF_UP); + result.put("yearTotal", currentYearMap.get("total")); + result.put("yearFinishCount", currentYearMap.get("finishCount")); + result.put("yearRate", currentYearAverage); - double averageDisposalRate = mapList.stream() - .mapToDouble(map -> { - double total = Double.parseDouble(String.valueOf(map.get("total"))); - double finishCount = Double.parseDouble(String.valueOf(map.get("finishCount"))); - return (total == 0) ? 0.0 : finishCount / total; - }) - .average() // 计算平均值 - .orElse(0.0); + //本月处理率 本月预警总数 本月处理个数 + Map monthMap = baseMapper.countMonthAlert(businessAlertBo); // 四舍五入保留两位小数 - BigDecimal roundedAverage = BigDecimal.valueOf(averageDisposalRate) + double monthTotal = Double.parseDouble(String.valueOf(monthMap.get("total"))); + double monthFinishCount = Double.parseDouble(String.valueOf(monthMap.get("finishCount"))); + BigDecimal currentMonthAverage = BigDecimal.valueOf((monthTotal == 0) ? 0.0 : monthFinishCount / monthTotal) .setScale(2, RoundingMode.HALF_UP); - return roundedAverage.doubleValue(); + + result.put("monthTotal", monthMap.get("total")); + result.put("monthFinishCount", monthMap.get("finishCount")); + result.put("monthRate", currentMonthAverage); + + return result; + } + + /** + * 今日情况 + * @param businessAlertBo + * @return + */ + @Override + public Map countCurrentDayAlert(BusinessAlertBo businessAlertBo) { + Map result = new HashMap<>(); + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + + Map dayMap = this.baseMapper.countCurrentDayAlert(businessAlertBo); + + result.put("dayTotal", dayMap.get("total")); + result.put("dayFinishCount", dayMap.get("finishCount")); + result.put("dayTodo", dayMap.get("todoCount")); + return result; + } + + /** + * 各局今日预警数量 + * @param businessAlertBo + * @return + */ + @Override + public Map countPostDayAlert(BusinessAlertBo businessAlertBo) { + Map result = new HashMap<>(); + //查询所有的职能岗位 + List postVoList = remotePostService.listPost(); + + if (ObjectUtil.isEmpty(postVoList)) { + return Map.of(); + } + + postVoList.forEach(postVo -> { + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + Map dayMap = this.baseMapper.countCurrentDayAlert(businessAlertBo); + result.put(postVo.getPostName(), dayMap.get("total")); + }); + + return result; + } + + /** + * 各局处理情况 + * @param businessAlertBo + * @return + */ + @Override + public List countPostAlert(BusinessAlertBo businessAlertBo) { + //查询所有的职能岗位 + List postVoList = remotePostService.listPost(); + + List result = new ArrayList<>(); + + if (ObjectUtil.isEmpty(postVoList)) { + return ListUtil.empty(); + } + + postVoList.forEach(postVo -> { + List labelList = getAiLabel(businessAlertBo.getPostCode()); + businessAlertBo.setAiLabelEnList(labelList); + Map dateMap = switch (businessAlertBo.getDateType()) { + case 1 -> baseMapper.countCurrentAlert(businessAlertBo); + case 2 -> baseMapper.countMonthAlert(businessAlertBo); + case 3 -> baseMapper.countCurrentDayAlert(businessAlertBo); + default -> baseMapper.countCurrentAlert(businessAlertBo); + }; + + result.add(new StatObj( + postVo.getPostName(), + List.of( + new StatObj("todoCount", dateMap.get("todoCount")), + new StatObj("finishCount", dateMap.get("finishCount")) + ) + )); + }); + + return result; } private List buildDateList(BusinessAlertBo businessAlertBo) { diff --git a/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml b/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml index 7d1a8cb..353353d 100644 --- a/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml +++ b/dk-modules/business/src/main/resources/mapper/business/BusinessAlertMapper.xml @@ -387,4 +387,45 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" ORDER BY md.dept_id, md.dateMonth + + + + + + + + diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/DeviceQrtzController.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/DeviceQrtzController.java index 4c923c1..79cabc0 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/DeviceQrtzController.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/controller/DeviceQrtzController.java @@ -15,6 +15,7 @@ import org.dromara.common.log.enums.BusinessType; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.web.core.BaseController; import org.dromara.sample.manage.model.dto.DeviceQrtzDTO; +import org.dromara.sample.manage.model.dto.DeviceQrtzFileDTO; import org.dromara.sample.manage.model.entity.DeviceQrtzEntity; import org.dromara.sample.manage.model.entity.DeviceQrtzFileEntity; import org.dromara.sample.manage.service.IDeviceQrtzService; @@ -43,7 +44,7 @@ public class DeviceQrtzController extends BaseController { /** - *自动飞行设备列表 + *自动飞行任务列表 * */ @SaCheckPermission("devict:qrtz:list") @GetMapping(value = "/page") @@ -65,7 +66,7 @@ public class DeviceQrtzController extends BaseController { } /** - * 设备定时飞行详情 + * 任务定时飞行详情 * * @param id 主键 */ @@ -78,7 +79,7 @@ public class DeviceQrtzController extends BaseController { } /** - * 自动飞行编辑 + * 自动飞行任务编辑 */ @SaCheckPermission("devict:qrtz:edit") @Log(title = "自动飞行编辑", businessType = BusinessType.UPDATE) @@ -87,11 +88,27 @@ public class DeviceQrtzController extends BaseController { public R edit(@Validated(EditGroup.class) @RequestBody DeviceQrtzEntity entity) { return toAjax(deviceQrtzService.updateByBo(entity)); } + /** - * 自动飞行新增 + * 自动飞行任务启停 + * @param status 1-使用,2-暂停 + * @param id 主键 + */ + @SaCheckPermission("devict:qrtz:edit") + @Log(title = "自动飞行任务启停", businessType = BusinessType.UPDATE) + @RepeatSubmit() + @PostMapping("status") + public R status(@Validated@NotNull(message = "任务id不能为空") Long id ,@NotNull(message = "状态不能为空") Integer status) { + return toAjax(deviceQrtzService.update(id,status)); + } + + + + /** + * 自动飞行任务新增 */ @SaCheckPermission("devict:qrtz:add") - @Log(title = "自动飞行新增", businessType = BusinessType.INSERT) + @Log(title = "自动飞行任务新增", businessType = BusinessType.INSERT) @RepeatSubmit() @PostMapping() public R add(@Validated(AddGroup.class) @RequestBody DeviceQrtzDTO deviceQrtzDTO) { @@ -99,15 +116,61 @@ public class DeviceQrtzController extends BaseController { } /** - * 自动飞行删除 + * 自动飞行任务删除 * * @param ids 主键串 */ - @SaCheckPermission("system:client:remove") - @Log(title = "自动飞行删除", businessType = BusinessType.DELETE) + @SaCheckPermission("devict:qrtz:remove") + @Log(title = "自动飞行任务删除", businessType = BusinessType.DELETE) @DeleteMapping("/{ids}") public R remove(@NotEmpty(message = "主键不能为空") @PathVariable Long[] ids) { return toAjax(deviceQrtzService.deleteIds(List.of(ids))); } + + /** 自动飞行-新增航线*/ + @SaCheckPermission("devict:qrtz:add") + @Log(title = "自动飞行新增航线", businessType = BusinessType.INSERT) + @RepeatSubmit() + @PostMapping("/addFileList") + public R addFileList(@Validated(AddGroup.class) @RequestBody DeviceQrtzFileDTO deviceQrtzFileDTO) { + return toAjax(deviceQrtzService.addFileList(deviceQrtzFileDTO)); + } + + + /** 自动飞行-复制航线*/ + @SaCheckPermission("devict:qrtz:copy") + @Log(title = "自动飞行新增航线", businessType = BusinessType.INSERT) + @RepeatSubmit() + @PostMapping("/copy") + public R copy(@Validated@NotNull(message = "任务id不能为空") Long qrtzId ,@NotNull(message = "请选择原来的时间") String starDate,@NotNull(message ="请选择新的时间") String endDate) { + return toAjax(deviceQrtzService.copy(qrtzId,starDate,endDate)); + } + + /** + * 自动飞行航线删除 + * + * @param id + */ + @SaCheckPermission("devict:qrtz:remove") + @Log(title = "自动飞行航线删除", businessType = BusinessType.DELETE) + @DeleteMapping("/fileRemove/{id}") + public R fileRemove(@NotEmpty(message = "主键不能为空") + @PathVariable Long id) { + return toAjax(deviceQrtzService.fileRemove(id)); + } + + + /** + * 自动飞行编辑航线排序 + * @param id 主键id + * @param sort 排序 + * */ + @SaCheckPermission("devict:qrtz:edit") + @Log(title = "自动飞行编辑航线排序", businessType = BusinessType.UPDATE) + @RepeatSubmit() + @PostMapping("/fileSort") + public R fileSort(@Validated@NotNull(message = "主键id不能为空") Long id ,@NotNull(message = "排序") Integer sort) { + return toAjax(deviceQrtzService.fileSort(id,sort)); + } } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/dto/DeviceQrtzFileDTO.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/dto/DeviceQrtzFileDTO.java index 0033b2c..22ddf03 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/dto/DeviceQrtzFileDTO.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/dto/DeviceQrtzFileDTO.java @@ -3,6 +3,7 @@ package org.dromara.sample.manage.model.dto; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; +import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -24,27 +25,27 @@ public class DeviceQrtzFileDTO { */ private String id; - private String qrtzId; + @NotNull(message = "任务id不能为空" ) + private Long qrtzId; - private String fileId; + private String waylineId; private String waylineName; - private String sort; + private Integer sort; private Integer status; - @TableField(exist = false) - private Integer fileNumber; - - @TableField(exist = false) - private String fileStr; + private Integer fileNumber; + /**多选 逗号隔开*/ + @NotNull(message = "waylineId 多选逗号隔开" ) + private String waylineIdStr; /** * 飞行时间 */ @@ -54,7 +55,7 @@ public class DeviceQrtzFileDTO { /** * 执行时间 */ - + @NotNull(message = "计划日期不能为空" ) private Date execDate; /** diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java index 18fe58a..61bb001 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java @@ -4,13 +4,15 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import lombok.*; import org.dromara.common.mybatis.core.domain.BaseEntity; -import java.io.Serializable; + import java.util.ArrayList; -import java.util.Date; + import java.util.List; @@ -26,6 +28,7 @@ import java.util.List; @Builder @NoArgsConstructor @AllArgsConstructor +@JsonNaming() // 设置为驼峰命名风格 public class DeviceQrtzEntity extends BaseEntity { @TableId(type = IdType.AUTO) diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzFileEntity.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzFileEntity.java index 0badac3..6f43149 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzFileEntity.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzFileEntity.java @@ -33,13 +33,13 @@ public class DeviceQrtzFileEntity implements Serializable { private Long qrtzId; - private String fileId; + private String waylineId; private String waylineName; - private String sort; + private Integer sort; private Integer status; diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/IDeviceQrtzService.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/IDeviceQrtzService.java index 2631adb..bcd2f01 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/IDeviceQrtzService.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/IDeviceQrtzService.java @@ -43,6 +43,12 @@ public interface IDeviceQrtzService { void jobWayline(DeviceDTO deviceDTO); Boolean updateByBo(DeviceQrtzEntity entity); + Boolean update(Long id ,Integer status); Boolean add( DeviceQrtzDTO deviceQrtzDTO); Boolean deleteIds(Collection ids); + Boolean fileRemove(Long id); + Boolean addFileList(DeviceQrtzFileDTO deviceQrtzFileDTO); + + Boolean copy(Long qrtzId ,String starDate,String endDate); + Boolean fileSort(Long id ,Integer sort); } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceQrtzServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceQrtzServiceImpl.java index a62fa5f..ba62b61 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceQrtzServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceQrtzServiceImpl.java @@ -1,9 +1,12 @@ package org.dromara.sample.manage.service.impl; +import cn.hutool.core.convert.Convert; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.dromara.common.core.constant.DeviceQrtzConstants; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.satoken.utils.LoginHelper; @@ -21,6 +24,8 @@ import org.dromara.sample.manage.service.IDeviceQrtzDateService; import org.dromara.sample.manage.service.IDeviceQrtzFileService; import org.dromara.sample.manage.service.IDeviceQrtzService; import org.dromara.sample.manage.service.IDeviceService; +import org.dromara.sample.wayline.mapper.IWaylineFileMapper; +import org.dromara.sample.wayline.model.entity.WaylineFileEntity; import org.dromara.system.api.model.LoginUser; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -46,6 +51,8 @@ public class DeviceQrtzServiceImpl implements IDeviceQrtzService { private IDeviceQrtzFileMapper deviceQrtzFileMapper; @Autowired private IDeviceQrtzDateService deviceQrtzDateService; + @Autowired + private IWaylineFileMapper waylineFileMapper; @Override public TableDataInfo listManageDeviceQrtz(Page page, DeviceQrtzDTO deviceQrtzDTO) { @@ -124,6 +131,14 @@ public class DeviceQrtzServiceImpl implements IDeviceQrtzService { return deviceQrtzMapper.updateById(entity)>0; } + @Override + public Boolean update(Long id, Integer status) { + DeviceQrtzEntity entity = new DeviceQrtzEntity(); + entity.setId(id); + entity.setStatus(status); + return deviceQrtzMapper.updateById(entity)>0; + } + @Override public Boolean add( DeviceQrtzDTO deviceQrtzDTO) { @@ -152,4 +167,73 @@ public class DeviceQrtzServiceImpl implements IDeviceQrtzService { } return deviceQrtzMapper.deleteBatchIds(ids)>0; } + + @Override + public Boolean fileRemove(Long id) { + return deviceQrtzFileMapper.deleteById(id)>0; + } + + @Override + public Boolean addFileList(DeviceQrtzFileDTO deviceQrtzFileDTO) { + int sort=1; + List selectList = deviceQrtzFileMapper.selectList(new QueryWrapper() + .eq("qrtz_id", deviceQrtzFileDTO.getQrtzId()).eq("exec_date",deviceQrtzFileDTO.getExecDate()) + .orderByDesc("sort")); + if(!CollectionUtils.isEmpty(selectList)){ + Integer number =selectList.get(0).getSort(); + if(ObjectUtil.isNotEmpty(number)){ + sort=number+1; + } + } + for (String waylineId : deviceQrtzFileDTO.getWaylineIdStr().split(",")) { + DeviceQrtzFileEntity entity = new DeviceQrtzFileEntity(); + QueryWrapper queryWrapper = new QueryWrapper().eq("wayline_id", waylineId); + WaylineFileEntity fileEntity = waylineFileMapper.selectOne(queryWrapper); + entity.setQrtzId(deviceQrtzFileDTO.getQrtzId()); + entity.setWaylineId(waylineId); + entity.setWaylineName(fileEntity.getName()); + try { + entity.setSort(sort); + entity.setStatus(DeviceQrtzConstants.QRTZ_FILE_STATUS_1); + entity.setExecDate(deviceQrtzFileDTO.getExecDate()); + + sort=sort+1; + deviceQrtzFileMapper.insert(entity); + }catch (Exception e){ + throw new RuntimeException("航线{"+fileEntity.getName()+"}已报错,需要重新添加,报错信息:"+e.getMessage()); + } + + } + return true; + } + + @Override + public Boolean copy(Long qrtzId, String starDate, String endDate) { + List selectList = deviceQrtzFileMapper.selectList(new QueryWrapper() + .eq("qrtz_id", qrtzId).eq("exec_date",starDate)); + if(!selectList.isEmpty()){ + List entityList = selectList.stream().map(e -> { + DeviceQrtzFileEntity entity = new DeviceQrtzFileEntity(); + entity.setQrtzId(e.getQrtzId()); + entity.setWaylineId(e.getWaylineId()); + entity.setWaylineName(e.getWaylineName()); + entity.setSort(e.getSort()); + entity.setStatus(DeviceQrtzConstants.QRTZ_FILE_STATUS_1); + entity.setExecDate(Convert.toDate(endDate)); + return entity; + }).toList(); + return deviceQrtzFileMapper.insert(entityList).size()>0; + }else { + throw new RuntimeException("未获取到需要复制的航线"); + } + + } + + @Override + public Boolean fileSort(Long id, Integer sort) { + DeviceQrtzFileEntity entity = new DeviceQrtzFileEntity(); + entity.setSort(sort); + entity.setId(id); + return deviceQrtzFileMapper.updateById(entity)>0; + } } diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java index 71e35ee..729d53a 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/AiCompareServiceImpl.java @@ -10,13 +10,13 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.executor.BatchResult; import org.dromara.common.core.constant.AiCompareStatusConstants; -import org.dromara.common.core.utils.MapstructUtils; -import org.dromara.common.core.utils.StreamUtils; + import org.dromara.common.core.utils.StringUtils; import org.dromara.common.mybatis.core.page.PageQuery; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.satoken.utils.LoginHelper; -import org.dromara.common.translation.annotation.Translation; +import org.dromara.common.sdk.mqtt.MqttGatewayPublish; +import org.dromara.common.sdk.mqtt.TopicConst; import org.dromara.sample.wayline.mapper.IAiCompareMapper; import org.dromara.sample.wayline.mapper.IWaylineJobMapper; import org.dromara.sample.wayline.model.dto.AiCompareDTO; @@ -48,7 +48,7 @@ public class AiCompareServiceImpl implements IAiCompareService { private final IAiCompareQueueService aiCompareQueueService; private final IAiCompareMapper aiCompareMapper; private final IWaylineJobMapper waylineJobMapper; - + private final MqttGatewayPublish mqttGatewayPublish; @Override public TableDataInfo queryPageList(AiCompareDTO bo, PageQuery pageQuery) { return TableDataInfo.build(aiCompareMapper.selectPage(pageQuery.build(),this.buildAiCompareDTOQueryWrapper(bo))); @@ -112,13 +112,21 @@ public class AiCompareServiceImpl implements IAiCompareService { aiCompareEntity.setTemplateId(jobEntityList.get(0).getJobId()); } } + String topic = TopicConst.IMAGE +TopicConst.COMPARE; + + // mqttGatewayPublish.publish(topic,byId,1); insertList.add(aiCompareEntity); } List resultList = aiCompareMapper.insertOrUpdate(insertList, 10); System.out.println(resultList); List list =compareDTOList.stream().map(e->Convert.toLong(e.get("queueId"))).collect(Collectors.toList()); + return aiCompareQueueService.updateStatus(list,AiCompareStatusConstants.QUEUW_TYPE_1); } + public List> listMediaFile(AiCompareEntity aiCompareEntity){ + return null; + } + @Override public Boolean update(Map compareDTO) { diff --git a/dk-modules/sample/src/main/resources/mapper/AiCompareMapper.xml b/dk-modules/sample/src/main/resources/mapper/AiCompareMapper.xml index b30ec1f..99e5abf 100644 --- a/dk-modules/sample/src/main/resources/mapper/AiCompareMapper.xml +++ b/dk-modules/sample/src/main/resources/mapper/AiCompareMapper.xml @@ -12,13 +12,14 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" SELECT mf.*, mfi.file_name as mateFileName, mfi.file_path as mateFilePath, - mfi.create_time as mateFileDate + mfi.create_time as mateFileDate, + CAST( SUBSTRING_INDEX( SUBSTRING_INDEX( mf.file_name, '_', 3 ), '_', - 1 ) AS UNSIGNED ) AS suffix FROM media_file mf LEFT JOIN ai_compare ai ON ai.job_id = mf.job_id LEFT JOIN media_file mfi ON mfi.job_id = ai.template_id WHERE mfi.job_id=#{jobId} - ORDER BY mf.create_time asc + ORDER BY suffix asc diff --git a/dk-modules/system/src/main/java/org/dromara/system/controller/system/SysPostController.java b/dk-modules/system/src/main/java/org/dromara/system/controller/system/SysPostController.java index c8e78c6..caac238 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/controller/system/SysPostController.java +++ b/dk-modules/system/src/main/java/org/dromara/system/controller/system/SysPostController.java @@ -10,7 +10,7 @@ import org.dromara.common.log.enums.BusinessType; import org.dromara.common.mybatis.core.page.PageQuery; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.web.core.BaseController; -import org.dromara.system.domain.bo.AiLablePostBindBo; +import org.dromara.system.domain.bo.AiLabelPostBindBo; import org.dromara.system.domain.bo.AiLabelPostBo; import org.dromara.system.domain.bo.SysPostBo; import org.dromara.system.domain.vo.AiLabelPostVo; @@ -76,17 +76,18 @@ public class SysPostController extends BaseController { @SaCheckPermission("system:post:add") @Log(title = "岗位管理-绑定标签", businessType = BusinessType.INSERT) @PostMapping("/bindLable") - public R add(@Validated @RequestBody AiLablePostBindBo aiLablePostBindBo) { + public R add(@Validated @RequestBody AiLabelPostBindBo aiLablePostBindBo) { return aiLablePostService.insertByBatchBo(aiLablePostBindBo)?R.ok():R.fail(); } /** - * 标签列表 + * 标签列表 -不分页 + * @param postId 岗位id */ @SaCheckPermission("system:post:list") @GetMapping("/lableList") - public TableDataInfo lableList(AiLabelPostBo post, PageQuery pageQuery) { - return postService.lableList(post, pageQuery); + public R> lableList(Long postId) { + return R.ok(aiLablePostService.queryListByLabel(postId)); } diff --git a/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabel.java b/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabel.java index fd21529..9294670 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabel.java +++ b/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabel.java @@ -24,7 +24,7 @@ public class AiLabel { /** * id */ - @TableId(value = "label_id") + @TableId(type = IdType.AUTO) private Long labelId; /** diff --git a/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabelPost.java b/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabelPost.java index 5e1e621..0439f39 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabelPost.java +++ b/dk-modules/system/src/main/java/org/dromara/system/domain/AiLabelPost.java @@ -20,7 +20,7 @@ public class AiLabelPost { @Serial private static final long serialVersionUID = 1L; - @TableId(type = IdType.ASSIGN_ID) + @TableId(type = IdType.AUTO) private Long id; /** * 标签id diff --git a/dk-modules/system/src/main/java/org/dromara/system/domain/bo/AiLablePostBindBo.java b/dk-modules/system/src/main/java/org/dromara/system/domain/bo/AiLabelPostBindBo.java similarity index 91% rename from dk-modules/system/src/main/java/org/dromara/system/domain/bo/AiLablePostBindBo.java rename to dk-modules/system/src/main/java/org/dromara/system/domain/bo/AiLabelPostBindBo.java index c3ba11e..0f6a117 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/domain/bo/AiLablePostBindBo.java +++ b/dk-modules/system/src/main/java/org/dromara/system/domain/bo/AiLabelPostBindBo.java @@ -11,7 +11,7 @@ import java.util.List; */ @Data -public class AiLablePostBindBo { +public class AiLabelPostBindBo { @NotNull(message = "postId不能为空") private Long postId; diff --git a/dk-modules/system/src/main/java/org/dromara/system/domain/vo/SysPostVo.java b/dk-modules/system/src/main/java/org/dromara/system/domain/vo/SysPostVo.java index 51e6d31..09419d2 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/domain/vo/SysPostVo.java +++ b/dk-modules/system/src/main/java/org/dromara/system/domain/vo/SysPostVo.java @@ -93,7 +93,7 @@ public class SysPostVo implements Serializable { /** * 标签集合 */ - public List labelList; + public List labelList; @Data public static class AiLabelVo{ diff --git a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteLabelPostServicelmpl.java b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteLabelPostServicelmpl.java index 61a64f4..592601a 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteLabelPostServicelmpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemoteLabelPostServicelmpl.java @@ -37,11 +37,8 @@ public class RemoteLabelPostServicelmpl implements RemoteLabelPostService { //获取岗位只能Id SysPostVo sysPostVo = sysPostService.selectLableByList(postCode, deptId); if(sysPostVo != null){ - //根据岗位职能Id获取标签id - List aiLabelPostVos = aiLabelPostService.queryListByLabel(sysPostVo.getPostId()); - //根据标签id获取标签 - List aiLabelList = aiLabelService.queryListByLabel(StreamUtils.toList(aiLabelPostVos,AiLabelPost::getLabelId)); - return aiLabelList.stream() + List postVoList = aiLabelPostService.queryListByLabel(sysPostVo.getPostId()); + return postVoList.stream() .map(aiLabel -> { RemoteAiLabelPostVo remoteAiLabelPostVo = new RemoteAiLabelPostVo(); remoteAiLabelPostVo.setLabelId(aiLabel.getLabelId()); diff --git a/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java new file mode 100644 index 0000000..7c2795c --- /dev/null +++ b/dk-modules/system/src/main/java/org/dromara/system/dubbo/RemotePostServiceImpl.java @@ -0,0 +1,28 @@ +package org.dromara.system.dubbo; + +import lombok.RequiredArgsConstructor; +import org.apache.dubbo.config.annotation.DubboService; +import org.dromara.common.core.utils.MapstructUtils; +import org.dromara.system.api.RemotePostService; +import org.dromara.system.api.domain.vo.RemotePostVo; +import org.dromara.system.domain.bo.SysPostBo; +import org.dromara.system.domain.vo.SysPostVo; +import org.dromara.system.service.ISysPostService; +import org.springframework.stereotype.Service; + +import java.util.List; + +@RequiredArgsConstructor +@Service +@DubboService +public class RemotePostServiceImpl implements RemotePostService { + + + private final ISysPostService sysPostService; + + @Override + public List listPost() { + List sysPostVos = sysPostService.selectPostList(new SysPostBo()); + return MapstructUtils.convert(sysPostVos, RemotePostVo.class); + } +} diff --git a/dk-modules/system/src/main/java/org/dromara/system/mapper/AiLabelPostMapper.java b/dk-modules/system/src/main/java/org/dromara/system/mapper/AiLabelPostMapper.java index d01058e..4cd408d 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/mapper/AiLabelPostMapper.java +++ b/dk-modules/system/src/main/java/org/dromara/system/mapper/AiLabelPostMapper.java @@ -7,6 +7,8 @@ import org.dromara.system.domain.AiLabelPost; import org.dromara.system.domain.vo.AiLabelPostVo; import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; +import java.util.List; + /** * 职能-标签关系Mapper接口 * @@ -15,4 +17,6 @@ import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; */ public interface AiLabelPostMapper extends BaseMapperPlus { Page selectAiLabelPostPage(@Param("page") Page page, @Param("ew") QueryWrapper wrapper); + + List queryListByLabel(@Param("postId") Long postId); } diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/IAiLabelPostService.java b/dk-modules/system/src/main/java/org/dromara/system/service/IAiLabelPostService.java index b95879e..e067c24 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/IAiLabelPostService.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/IAiLabelPostService.java @@ -2,7 +2,7 @@ package org.dromara.system.service; import org.dromara.system.domain.AiLabelPost; import org.dromara.system.domain.bo.AiLabelPostBo; -import org.dromara.system.domain.bo.AiLablePostBindBo; +import org.dromara.system.domain.bo.AiLabelPostBindBo; import org.dromara.system.domain.vo.AiLabelPostVo; import org.dromara.common.mybatis.core.page.TableDataInfo; import org.dromara.common.mybatis.core.page.PageQuery; @@ -68,7 +68,9 @@ public interface IAiLabelPostService { */ Boolean deleteWithValidByIds(Collection ids, Boolean isValid); - List queryListByLabel(Long postId); + List queryListLabel(Long postId); - Boolean insertByBatchBo(AiLablePostBindBo aiLablePostBindBo); + List queryListByLabel(Long postId); + + Boolean insertByBatchBo(AiLabelPostBindBo aiLablePostBindBo); } diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/ISysPostService.java b/dk-modules/system/src/main/java/org/dromara/system/service/ISysPostService.java index 1a3fec5..31f7f64 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/ISysPostService.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/ISysPostService.java @@ -20,6 +20,7 @@ public interface ISysPostService { TableDataInfo selectPagePostList(SysPostBo post, PageQuery pageQuery); TableDataInfo lableList(AiLabelPostBo bo, PageQuery pageQuery); + List lableList(Long postId); /** * 查询岗位信息集合 * diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLabelServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLabelServiceImpl.java index 05bea7e..8ab199f 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLabelServiceImpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLabelServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.system.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.mybatis.core.page.TableDataInfo; @@ -138,8 +139,8 @@ public class AiLabelServiceImpl implements IAiLabelService { @Override public List queryListByLabel(List labelIds) { - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); - wrapper.in(AiLabel::getLabelId, labelIds); - return this.baseMapper.selectList(wrapper); + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.in(!labelIds.isEmpty(),"label_id",labelIds); + return this.baseMapper.selectList(queryWrapper); } } diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLablePostServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLablePostServiceImpl.java index 583e61d..1ffda2d 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLablePostServiceImpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/impl/AiLablePostServiceImpl.java @@ -9,7 +9,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import org.dromara.system.domain.AiLabelPost; -import org.dromara.system.domain.bo.AiLablePostBindBo; +import org.dromara.system.domain.bo.AiLabelPostBindBo; import org.dromara.system.domain.vo.AiLabelPostVo; import org.dromara.system.mapper.AiLabelMapper; import org.springframework.stereotype.Service; @@ -33,7 +33,7 @@ import java.util.Collection; public class AiLablePostServiceImpl implements IAiLabelPostService { private final AiLabelPostMapper baseMapper; - private final AiLabelMapper aiLabelMapper; + private final AiLabelPostMapper aiLabelPostMapper; /** * 查询职能-标签关系 @@ -128,20 +128,33 @@ public class AiLablePostServiceImpl implements IAiLabelPostService { } @Override - public List queryListByLabel(Long postId) { - LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + public List queryListByLabel(Long postId) { + return aiLabelPostMapper.queryListByLabel(postId); + } + /** + * 保存前的数据校验 + */ + private void delPostId(Long postId){ + //新增前先删除之前的标签 + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("post_id", postId); + baseMapper.delete(queryWrapper); + } + @Override + public List queryListLabel(Long postId) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(AiLabelPost::getPostId, postId); - return this.baseMapper.selectList(wrapper); } @Override - public Boolean insertByBatchBo(AiLablePostBindBo aiLablePostBindBo) { + public Boolean insertByBatchBo(AiLabelPostBindBo aiLabelPostBindBo) { + delPostId(aiLabelPostBindBo.getPostId()); List resultlist = new ArrayList<>(); - for (Long labelId : aiLablePostBindBo.getLableIdList()) { + for (Long labelId : aiLabelPostBindBo.getLableIdList()) { AiLabelPost ailabelPost = new AiLabelPost(); - ailabelPost.setPostId(aiLablePostBindBo.getPostId()); + ailabelPost.setPostId(aiLabelPostBindBo.getPostId()); ailabelPost.setLabelId(labelId); resultlist.add(ailabelPost); } diff --git a/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysPostServiceImpl.java b/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysPostServiceImpl.java index 174a04a..18cc014 100644 --- a/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysPostServiceImpl.java +++ b/dk-modules/system/src/main/java/org/dromara/system/service/impl/SysPostServiceImpl.java @@ -55,19 +55,10 @@ public class SysPostServiceImpl implements ISysPostService { Page page = baseMapper.selectPagePostList(pageQuery.build(), buildQueryWrapper(post)); page.getRecords().forEach(sysPostVo -> { - List postVoList = remoteLabelPostService.selectLabelByList(sysPostVo.getPostCode(), sysPostVo.getDeptId()); - sysPostVo.setLabelList(postVoList.stream() - .map(aiLabel -> { - SysPostVo.AiLabelVo aiLabelVo = new SysPostVo.AiLabelVo(); - aiLabelVo.setLabelId(aiLabel.getLabelId()); - aiLabelVo.setLabelEn(aiLabel.getLabelEn()); - aiLabelVo.setLabelCn(aiLabel.getLabelCn()); - aiLabelVo.setPostId(aiLabel.getPostId()); - return aiLabelVo; - }) - .toList()); + Long postId = sysPostVo.getPostId(); + List lableList = lableList(postId); + sysPostVo.setLabelList(lableList); }); - return TableDataInfo.build(page); } @@ -76,6 +67,11 @@ public class SysPostServiceImpl implements ISysPostService { return aiLablePostService.queryPageList(bo,pageQuery); } + @Override + public List lableList(Long postId) { + return aiLablePostService.queryListByLabel(postId); + } + /** * 查询岗位信息集合 * diff --git a/dk-modules/system/src/main/resources/mapper/system/AiLabelPostMapper.xml b/dk-modules/system/src/main/resources/mapper/system/AiLabelPostMapper.xml index 979d018..73ee604 100644 --- a/dk-modules/system/src/main/resources/mapper/system/AiLabelPostMapper.xml +++ b/dk-modules/system/src/main/resources/mapper/system/AiLabelPostMapper.xml @@ -5,6 +5,10 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + diff --git a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java index a2e3fb1..f52887a 100644 --- a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java +++ b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/FlowDepart.java @@ -25,8 +25,8 @@ public class FlowDepart { /** * 流程类型 */ - @NotNull(message = "流程类型不能为空", groups = AddGroup.class) - private String flowType; +// @NotNull(message = "流程类型不能为空", groups = AddGroup.class) +// private String flowType; /** * 流程编码 diff --git a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java index aef7573..a235d22 100644 --- a/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java +++ b/dk-modules/workflow/src/main/java/org/dromara/workflow/domain/vo/FlowDefinitionVo.java @@ -7,6 +7,7 @@ import org.dromara.workflow.common.constant.FlowConstant; import java.io.Serial; import java.io.Serializable; import java.util.Date; +import java.util.List; /** * 流程定义视图 @@ -101,4 +102,6 @@ public class FlowDefinitionVo implements Serializable { * 扩展字段,预留给业务系统使用 */ private String ext; + + private List deptIds; } diff --git a/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java b/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java index b4ebcdf..08901e9 100644 --- a/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java +++ b/dk-modules/workflow/src/main/java/org/dromara/workflow/service/impl/FlwDefinitionServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.ObjectUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -31,8 +32,10 @@ import org.dromara.warm.flow.orm.mapper.FlowSkipMapper; import org.dromara.workflow.common.ConditionalOnEnable; import org.dromara.workflow.common.constant.FlowConstant; import org.dromara.workflow.domain.FlowCategory; +import org.dromara.workflow.domain.FlowDepart; import org.dromara.workflow.domain.vo.FlowDefinitionVo; import org.dromara.workflow.mapper.FlwCategoryMapper; +import org.dromara.workflow.service.FlwDepartService; import org.dromara.workflow.service.IFlwDefinitionService; import org.dromara.workflow.utils.WorkflowUtils; import org.springframework.stereotype.Service; @@ -64,6 +67,7 @@ public class FlwDefinitionServiceImpl implements IFlwDefinitionService { private final FlowNodeMapper flowNodeMapper; private final FlowSkipMapper flowSkipMapper; private final FlwCategoryMapper flwCategoryMapper; + private final FlwDepartService flwDepartService; /** * 查询流程定义列表 @@ -80,6 +84,12 @@ public class FlwDefinitionServiceImpl implements IFlwDefinitionService { TableDataInfo build = TableDataInfo.build(); build.setRows(BeanUtil.copyToList(page.getRecords(), FlowDefinitionVo.class)); build.setTotal(page.getTotal()); + build.getRows().forEach(p ->{ + FlowDepart info = flwDepartService.getInfo(p.getFlowCode()); + if(ObjectUtil.isNotEmpty(info)){ + p.setDeptIds(info.getDepartIds()); + } + }); return build; } diff --git a/dk-visual/pom.xml b/dk-visual/pom.xml index 1cb541d..c0b0751 100644 --- a/dk-visual/pom.xml +++ b/dk-visual/pom.xml @@ -14,6 +14,7 @@ seata-server nacos snailjob-server + rocketmq dk-visual diff --git a/dk-visual/rocketmq/pom.xml b/dk-visual/rocketmq/pom.xml new file mode 100644 index 0000000..9252d69 --- /dev/null +++ b/dk-visual/rocketmq/pom.xml @@ -0,0 +1,85 @@ + + + + org.dromara + dk-visual + ${revision} + + 4.0.0 + + rocketmq + + + + + org.apache.rocketmq + rocketmq-client + 4.9.0 + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + com.alibaba.nacos + nacos-client + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + com.mysql + mysql-connector-j + + + + org.projectlombok + lombok + + + + org.dromara + common-web + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java new file mode 100644 index 0000000..b45609c --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/RocketMQApplication.java @@ -0,0 +1,21 @@ +package com.ruoyi.rocketmq; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; + +/** + * RocketMQ模块 + * + * @author + */ +@SpringBootApplication +@EnableAsync +public class RocketMQApplication +{ + public static void main(String[] args) + { + SpringApplication.run(RocketMQApplication.class, args); + System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java new file mode 100644 index 0000000..e086fa2 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ConsumerConfig.java @@ -0,0 +1,75 @@ +package com.ruoyi.rocketmq.config; + +import com.ruoyi.rocketmq.enums.MessageCodeEnum; +import com.ruoyi.rocketmq.enums.MessageTopic; +import com.ruoyi.rocketmq.model.ConsumerMode; +import com.ruoyi.rocketmq.consumer.RocketMsgListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +/** + * 消费者配置 + */ +@RefreshScope +@Configuration +@Slf4j +public class ConsumerConfig { + + @Autowired + private ConsumerMode consumerMode; + + @Bean + public DefaultMQPushConsumer getRocketMQConsumer() { + //构建客户端连接 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); + // + consumer.setNamesrvAddr(consumerMode.getNameServer()); + consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); + consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); + consumer.registerMessageListener(new RocketMsgListener()); + /** + * 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 + * 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 + * 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 + * 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 + */ + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + /** + * CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 + * 订阅topic整体,从而达到负载均衡的目的 + * BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 + * 需要注意的是,在广播模式下,每个Consumer都会独立地处理相同的消息副本。这可能会导致一些潜在的问题,例如消息重复处理或者资源浪费。因此,在使用广播模式时,请确保消息的处理逻辑是幂等的,并仔细考虑系统资源的消耗。 + */ + // consumer.setMessageModel(MessageModel.BROADCASTING); + + consumer.setVipChannelEnabled(false); + consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); + try { + /** + * 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 + * 由于官方并没有给直接订阅全部消息示例 所以使用list列表循环订阅所有topic + */ + // 获取所有topic列表 + MessageTopic messageTopic = new MessageTopic(); + List allTopics = messageTopic.RocketMQTopicList(); + //订阅所有topic + for (String topic : allTopics) { + consumer.subscribe(topic,"*"); + } + consumer.start(); + log.info("消费者初始化成功:{}", consumer); + } catch (MQClientException e) { + e.printStackTrace(); + log.error("消费者初始化失败:{}",e.getMessage()); + } + return consumer; + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java new file mode 100644 index 0000000..8d5ed48 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/config/ProducerConfig.java @@ -0,0 +1,56 @@ +package com.ruoyi.rocketmq.config; + +import com.ruoyi.rocketmq.model.ProducerMode; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * mq搭建地址连接 + * 生产者初者连接信息 具体看nacos配置 + */ +@Configuration +@Slf4j +public class ProducerConfig { + + /** + * 远程调用连接信息 + */ + public static DefaultMQProducer producer; + + /** + * 连接客户端信息配置 具体看nacos配置 + */ + @Autowired + private ProducerMode producerMode; + + @Bean + public DefaultMQProducer getRocketMQProducer() { + producer = new DefaultMQProducer(producerMode.getGroupName()); + producer.setNamesrvAddr(producerMode.getNameServer()); + //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName + if(producerMode.getMaxMessageSize()!=null){ + producer.setMaxMessageSize(producerMode.getMaxMessageSize()); + } + if(producerMode.getSendMsgTimeout()!=null){ + producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); + } + //如果发送消息失败,设置重试次数,默认为2次 + if(producerMode.getRetryTimesWhenSendFailed()!=null){ + producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); + } + producer.setVipChannelEnabled(false); + try { + producer.start(); + log.info("生产者初始化成功:{}",producer.toString()); + } catch (MQClientException e) { + log.error("生产者初始化失败:{}",e.getMessage()); + } + return producer; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java new file mode 100644 index 0000000..153a2f2 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java @@ -0,0 +1,87 @@ +package com.ruoyi.rocketmq.consumer; + +import com.ruoyi.rocketmq.enums.MessageCodeEnum; +import com.ruoyi.rocketmq.producer.ConsumeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.UnsupportedEncodingException; +import java.util.List; + +/** + * 消息监听 + */ +@Slf4j +@Component +public class RocketMsgListener implements MessageListenerConcurrently { + + /** + * 消费消息 + * @param list msgs.size() >= 1 + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 + * @param consumeConcurrentlyContext 上下文信息 + * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + try{ + //消息不等于空情况 + if (!CollectionUtils.isEmpty(list)) { + //获取topic + for (MessageExt messageExt : list) { + // 解析消息内容 + String body = new String(messageExt.getBody()); + log.info("接受到的消息为:{}", body); + String tags = messageExt.getTags(); + String topic = messageExt.getTopic(); + String msgId = messageExt.getMsgId(); + String keys = messageExt.getKeys(); + int reConsume = messageExt.getReconsumeTimes(); + // 消息已经重试了3次,如果不需要再次消费,则返回成功 + if (reConsume == 3) { + // TODO 补偿信息 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常 + } + // 根据不同的topic处理不同的业务 这里以订单消息为例子 + if (MessageCodeEnum.ORDER_MESSAGE_TOPIC.getCode().equals(topic)) { + if (MessageCodeEnum.ORDER_MESSAGE_TAG.getCode().equals(tags)) { + //处理你的业务 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 + } else { + log.info("未匹配到Tag【{}】" + tags); + } + } + } + } + // 消息消费失败 + //broker会根据设置的messageDelayLevel发起重试,默认16次 + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } catch (Exception e) { + // 调用 handleException 方法处理异常并返回处理结果 + return handleException(e); + } + } + + /** + * 异常处理 + * + * @param e 捕获的异常 + * @return 消息消费结果 + */ + private static ConsumeConcurrentlyStatus handleException(final Exception e) { + Class exceptionClass = e.getClass(); + if (exceptionClass.equals(UnsupportedEncodingException.class)) { + log.error(e.getMessage()); + } else if (exceptionClass.equals(ConsumeException.class)) { + log.error(e.getMessage()); + } + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java new file mode 100644 index 0000000..f2407f5 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RocketMqController.java @@ -0,0 +1,123 @@ +package com.ruoyi.rocketmq.controller; + + +import com.ruoyi.rocketmq.producer.MessageProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 消息测试类Controller + */ +@RestController +@RequestMapping("/api/rocketMessage") +public class RocketMqController { + + + + /** + * 发送同步消息 + */ + @PostMapping("/sendSynchronizeMessage") + private Map sendSynchronizeMessage(){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 + SendResult sendResult = messageProducer.sendSynchronizeMessage("order-message","order_message_tag","title","content"); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + + + /** + * 发送单向消息 + */ + @PostMapping("/sendOnewayMessage") + private Map sendOnewayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + messageProducer.sendOnewayMessage("order-message","order_timeout_tag","title","content"); + Map result = new HashMap<>(); + result.put("msg","发送成功"); + result.put("code",200); + return result; + } + + + /** + * 批量发送消息 + */ + @PostMapping("/sendBatchMessage") + private Map sendBatchMessage(){ + // 根据实际需求创建消息列表并返回 + List messages = new ArrayList<>(); + // 添加消息到列表 + messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendBatchMessage(messages); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + + /** + * 发送有序的消息 + */ + @PostMapping("/sendOrderlyMessage") + private Map sendOrderlyMessage(){ + // 根据实际需求创建消息列表并返回 + List messages = new ArrayList<>(); + // 添加消息到列表 + messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); + messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); + Integer messageQueueNumber = 3; + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendOrderlyMessage(messages,messageQueueNumber); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + /** + * 发送延迟消息 + */ + @PostMapping("/sendDelayMessage") + private Map sendDelayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendDelayMessage("order-message","order_timeout_tag","title","content"); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + + + /** + * 发送异步的消息 + */ + @PostMapping("/sendAsyncProducerMessage") + private Map sendAsyncProducerMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ + MessageProducer messageProducer = new MessageProducer(); + //调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的 + SendResult sendResult = messageProducer.sendAsyncProducerMessage("order-message","order_timeout_tag","title","content"); + Map result = new HashMap<>(); + result.put("data",sendResult); + return result; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java new file mode 100644 index 0000000..3e038ec --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageCodeEnum.java @@ -0,0 +1,55 @@ +package com.ruoyi.rocketmq.enums; + + +import lombok.Getter; + +/** + * 用于传递topic和 tag + * 也用于接收消息后判断不同的消息处理不同的业务 + */ +@Getter +public enum MessageCodeEnum { + + /** + * 系统消息 + */ + NOTE_MESSAGE_TOPIC("system-message","系统消息服务模块topic名称"), + /** + * 用户消息 + */ + USER_MESSAGE_TOPIC("user-message","用户消息服务模块topic名称"), + + /** + * 订单消息 + */ + ORDER_MESSAGE_TOPIC("order-message","订单消息服务模块topic名称"), + + /** + * 用户消息tag + */ + USER_MESSAGE_TAG("user_message_tag","用户消息推送"), + + /** + * 系统消息tag + */ + NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), + + /** + * 订单消息 + */ + ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), + + /** + * 订单处理编号 + */ + ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); + + private final String code; + private final String msg; + + MessageCodeEnum(String code, String msg){ + this.code = code; + this.msg = msg; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java new file mode 100644 index 0000000..fe1ea39 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/enums/MessageTopic.java @@ -0,0 +1,24 @@ +package com.ruoyi.rocketmq.enums; + + +import java.util.ArrayList; +import java.util.List; + +/** + * 定义topic列表 + */ +public class MessageTopic { + + //在这里添加topic 用于批量订阅 + public List RocketMQTopicList(){ + List getTopicLists=new ArrayList<>(); + //系统消息 + getTopicLists.add("system-message"); + //用户消息 + getTopicLists.add("user-message"); + //订单消息 + getTopicLists.add("order-message"); + return getTopicLists; + } + +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java new file mode 100644 index 0000000..b76cab1 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ConsumerMode.java @@ -0,0 +1,26 @@ +package com.ruoyi.rocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +/** + * 消费者初始化 + * 消费者连接信息 具体看nacos配置 + */ +@Data +@Configuration +@Component +public class ConsumerMode { + @Value("${rocketmq.nameServer}") + private String nameServer; + @Value("${rocketmq.consumer.groupName}") + private String groupName ; + @Value("${rocketmq.consumer.consumerThreadMin}") + private Integer consumeThreadMin; + @Value("${rocketmq.consumer.consumerThreadMax}") + private Integer consumeThreadMax; + @Value("${rocketmq.consumer.consumerMessageBatchMaxSize}") + private Integer consumeMessageBatchMaxSize; +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java new file mode 100644 index 0000000..f605f57 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/model/ProducerMode.java @@ -0,0 +1,25 @@ +package com.ruoyi.rocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; + +/** + * 生产者初始化 + */ +@RefreshScope +@Data +@Configuration +public class ProducerMode { + @Value("${rocketmq.producer.groupName}") + private String groupName; + @Value("${rocketmq.nameServer}") + private String nameServer; + @Value("${rocketmq.producer.maxMessageSize}") + private Integer maxMessageSize; + @Value("${rocketmq.producer.sendMsgTimeout}") + private Integer sendMsgTimeout; + @Value("${rocketmq.producer.retryTimesWhenSendFailed}") + private Integer retryTimesWhenSendFailed; +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java new file mode 100644 index 0000000..825f900 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/ConsumeException.java @@ -0,0 +1,23 @@ +package com.ruoyi.rocketmq.producer; + +/** + * @author 影子 + * 用于捕捉异常非受检异常(unchecked exception) + * RuntimeException 和其子类的异常在编译时不需要进行强制性的异常处理,可以选择在运行时进行捕获和处理 + * 可选择使用 + */ +public class ConsumeException extends RuntimeException{ + private static final long serialVersionUID = 4093867789628938836L; + + public ConsumeException(String message) { + super(message); + } + + public ConsumeException(Throwable cause) { + super(cause); + } + + public ConsumeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java new file mode 100644 index 0000000..2bb9057 --- /dev/null +++ b/dk-visual/rocketmq/src/main/java/com/ruoyi/rocketmq/producer/MessageProducer.java @@ -0,0 +1,190 @@ +package com.ruoyi.rocketmq.producer; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.producer.TransactionSendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static com.ruoyi.rocketmq.config.ProducerConfig.producer; + +/** + * 消息发送 + */ +@Slf4j +public class MessageProducer { + + + /** + * 同步发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 通过调用 send() 方法发送消息,阻塞等待服务器响应。 + */ + public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + SendResult result = producer.send(msg); + return result; + } catch (Exception e) { + e.printStackTrace(); + log.error("消息初始化失败!body:{}",body); + + } + return null; + } + + /** + * 单向发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。 + */ + public void sendOnewayMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + producer.sendOneway(msg); + } catch (UnsupportedEncodingException e) { + log.error("消息初始化失败!body:{}",body); + + } catch (MQClientException | InterruptedException | RemotingException e) { + log.error("消息发送失败! body:{}",body); + } + } + + + /** + * 批量发送消息 + * @param messages 消息列表 + * 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。 + */ + public SendResult sendBatchMessage(List messages){ + String body = messages.toString(); + try { + System.out.println("生产者发送消息:"+ messages); + // 发送批量消息 + SendResult sendResult = producer.send(messages); + return sendResult; + } catch (MQClientException | InterruptedException | RemotingException e) { + log.error("消息发送失败! body:{}",body); + } catch (MQBrokerException e) { + throw new RuntimeException(e); + } + return null; + } + + + /** + * 发送有序的消息 + * @param messagesList Message集合 + * @param messageQueueNumber 消息队列数量,根据实际情况设定 + * 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。 + */ + public SendResult sendOrderlyMessage(List messagesList, Integer messageQueueNumber) { + SendResult result = null; + for (Message message : messagesList) { + try { + result = producer.send(message, (list, msg, arg) -> { + Integer queueNumber = (Integer) arg; + //int queueIndex = queueNumber % list.size(); + return list.get(queueNumber); + }, messageQueueNumber);//根据编号取模,选择消息队列 + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + log.error("发送有序消息失败"); + return result; + } + } + return result; + } + + /** + * 发送延迟消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 延迟发送:通过设置延迟级别来实现延迟发送消息。 + */ + public SendResult sendDelayMessage(String topic, String tag, String key, String value) + { + SendResult result = null; + try + { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + //设置消息延迟级别,我这里设置5,对应就是延时一分钟 + // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" + msg.setDelayTimeLevel(4); + // 发送消息到一个Broker + result = producer.send(msg); + // 通过sendResult返回消息是否成功送达 + log.info("发送延迟消息结果:======sendResult:{}", result); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("发送时间:{}", format.format(new Date())); + return result; + } + catch (Exception e) + { + e.printStackTrace(); + log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); + } + return result; + } + /** + * 发送异步的消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。 + */ + public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){ + + try { + //创建一个消息实例,指定主题、标签和消息体。 + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + producer.send(msg,new SendCallback() { + // 异步回调的处理 + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); + } + @Override + public void onException(Throwable e) { + System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); + e.printStackTrace(); + } + }); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return null; + } + +} diff --git a/dk-visual/rocketmq/src/main/resources/application.yml b/dk-visual/rocketmq/src/main/resources/application.yml new file mode 100644 index 0000000..c1758d8 --- /dev/null +++ b/dk-visual/rocketmq/src/main/resources/application.yml @@ -0,0 +1,59 @@ +server: + port: 9402 + +# Spring +spring: + application: + # 应用名称 + name: rocketmq + profiles: + # 环境配置 + active: @profiles.active@ + +--- # rocketmq 配置 +rocketmq: + # 生产者的组名 + producer: + #是否开启自动配置 +# isEnable: true + # 发送同一类消息的设置为同一个group,保证唯一 + groupName: message-producer + # 消息最大长度 默认1024*4(4M) + maxMessageSize: 4096 + # 发送消息超时时间,默认3000 + sendMsgTimeout: 3000 + # 发送消息失败重试次数,默认2 + retryTimesWhenSendFailed: 3 +# group: produce-group + # 消费者的组名 + consumer: + #是否开启自动配置 +# isEnable: true + # 官方建议:确保同一组中的每个消费者订阅相同的主题。 + groupName: message-consumer + consumerThreadMin: 20 + consumerThreadMax: 64 + # 设置一次消费消息的条数,默认为1条 + consumerMessageBatchMaxSize: 1 + # NameServer地址 + nameServer: 114.235.183.147:9876 + +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml diff --git a/dk-visual/rocketmq/src/main/resources/banner.txt b/dk-visual/rocketmq/src/main/resources/banner.txt new file mode 100644 index 0000000..d826e58 --- /dev/null +++ b/dk-visual/rocketmq/src/main/resources/banner.txt @@ -0,0 +1,6 @@ + ___ _ _ __ __ ___ + | _ \ ___ __ | |__ ___ | |_ | \/ | / _ \ + | / / _ \ / _| | / / / -_) | _| | |\/| || (_) | + |_|_\ \___/ \__|_ |_\_\ \___| _\__| |_|__|_| \__\_\ +_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| +"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-' diff --git a/dk-visual/rocketmq/src/main/resources/logback-plus.xml b/dk-visual/rocketmq/src/main/resources/logback-plus.xml new file mode 100644 index 0000000..caaa345 --- /dev/null +++ b/dk-visual/rocketmq/src/main/resources/logback-plus.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + ${console.log.pattern} + utf-8 + + + + + + + + + + + + + + + diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java new file mode 100644 index 0000000..4ec745a --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/RocketMQApplication.java @@ -0,0 +1,25 @@ +package com.ruoyi.testrocketmq; + +import com.ruoyi.common.security.annotation.EnableCustomConfig; +import com.ruoyi.common.security.annotation.EnableRyFeignClients; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; + +/** + * 平台管理模块 + * + * @author ruoyi + */ +@EnableCustomConfig +@EnableRyFeignClients +@SpringBootApplication +@EnableAsync +public class RocketMQApplication +{ + public static void main(String[] args) + { + SpringApplication.run(RocketMQApplication.class, args); + System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java new file mode 100644 index 0000000..38ebbae --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ConsumerConfig.java @@ -0,0 +1,64 @@ +package com.ruoyi.testrocketmq.config; + +import com.ruoyi.testrocketmq.consumer.RocketMsgListener; +import com.ruoyi.testrocketmq.enums.MessageCodeEnum; +import com.ruoyi.testrocketmq.model.ConsumerMode; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 消费者配置 + */ +@RefreshScope +@Configuration +@Slf4j +public class ConsumerConfig { + @Autowired + private ConsumerMode consumerMode; + + @Bean + public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { +// ConsumerMode consumerMode = new ConsumerMode(); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); + consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); + consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); + consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); + consumer.registerMessageListener(new RocketMsgListener()); + /** + * 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 + * 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 + * 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 + * 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 + */ + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + /** + * CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 + * 订阅topic整体,从而达到负载均衡的目的 + * BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 + * + */ + // consumer.setMessageModel(MessageModel.BROADCASTING); + + consumer.setVipChannelEnabled(false); + consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); + try { + /** + * 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 + */ + consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); + consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); + consumer.start(); + log.info("消费者初始化成功:{}", consumer.toString()); + } catch (MQClientException e) { + e.printStackTrace(); + log.error("消费者初始化失败:{}",e.getMessage()); + } + return consumer; + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java new file mode 100644 index 0000000..1d4b56e --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/MessageConfig.java @@ -0,0 +1,27 @@ +package com.ruoyi.testrocketmq.config; + +/** + * @author yz + */ +public class MessageConfig { + private Class messageClass; + private boolean orderlyMessage; + + public Class getMessageClass() { + return messageClass; + } + + public void setMessageClass(Class messageClass) { + this.messageClass = messageClass; + } + + public boolean isOrderlyMessage() { + return orderlyMessage; + } + + public void setOrderlyMessage(boolean orderlyMessage) { + this.orderlyMessage = orderlyMessage; + } + + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java new file mode 100644 index 0000000..b55917e --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/config/ProducerConfig.java @@ -0,0 +1,48 @@ +package com.ruoyi.testrocketmq.config; + +import com.ruoyi.testrocketmq.model.ProducerMode; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +@Slf4j +public class ProducerConfig { + + public static DefaultMQProducer producer; + + @Autowired + private ProducerMode producerMode; + + + + @Bean + public DefaultMQProducer getRocketMQProducer() { + producer = new DefaultMQProducer(producerMode.getGroupName()); + producer.setNamesrvAddr(producerMode.getNamesrvAddr()); + //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName + if(producerMode.getMaxMessageSize()!=null){ + producer.setMaxMessageSize(producerMode.getMaxMessageSize()); + } + if(producerMode.getSendMsgTimeout()!=null){ + producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); + } + //如果发送消息失败,设置重试次数,默认为2次 + if(producerMode.getRetryTimesWhenSendFailed()!=null){ + producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); + } + producer.setVipChannelEnabled(false); + try { + producer.start(); + log.info("生产者初始化成功:{}",producer.toString()); + } catch (MQClientException e) { + log.error("生产者初始化失败:{}",e.getMessage()); + } + return producer; + } + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java new file mode 100644 index 0000000..88d2b05 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/consumer/RocketMsgListener.java @@ -0,0 +1,100 @@ +package com.ruoyi.testrocketmq.consumer; + +import com.ruoyi.testrocketmq.enums.MessageCodeEnum; +import com.ruoyi.testrocketmq.producer.ConsumeException; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +/** + * 消息监听 + */ +@Slf4j +@Component +public class RocketMsgListener implements MessageListenerConcurrently { + + /** + * 消费消息 + * + * @param list msgs.size() >= 1 + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 + * @param consumeConcurrentlyContext 上下文信息 + * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + + if (!CollectionUtils.isEmpty(list)) { + for (MessageExt messageExt : list) { + // 消息内容 + String body = new String(messageExt.getBody()); + log.info("接受到的消息为:{}", body); + String tags = messageExt.getTags(); + String topic = messageExt.getTopic(); + String msgId = messageExt.getMsgId(); + String keys = messageExt.getKeys(); + int reConsume = messageExt.getReconsumeTimes(); + // 消息已经重试了3次,如果不需要再次消费,则返回成功 + if (reConsume == 3) { + // TODO 补偿信息 + //smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败"); + log.error("消息重试超过3次,消费失败!"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + // 订单超时处理 + if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { + if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { +// //获取订单 +// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys); +// if (dealUserOrder != null) { +// //订单状态超时未支付关闭订单 处理 +// if (dealUserOrder.getStatus().equals("1")) { +// DealUserOrder dealUserOrders = new DealUserOrder(); +// dealUserOrders.setOrderId(dealUserOrder.getOrderId()); +// dealUserOrders.setStatus("4"); +// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders); +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; +// } +// log.info("Order does not exist."); +// } + log.info("Consumption success:" + body); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("Consumption time:{}", format.format(new Date())); + } else { + log.info("未匹配到Tag【{}】" + tags); + } + } + } + } + // 消息消费成功 + //ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次 + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + /** + * 异常处理 + * + * @param e 捕获的异常 + * @return 消息消费结果 + */ + private static ConsumeConcurrentlyStatus handleException(final Exception e) { + Class exceptionClass = e.getClass(); + if (exceptionClass.equals(UnsupportedEncodingException.class)) { + log.error(e.getMessage()); + } else if (exceptionClass.equals(ConsumeException.class)) { + log.error(e.getMessage()); + } + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java new file mode 100644 index 0000000..e6ff8c4 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/enums/MessageCodeEnum.java @@ -0,0 +1,60 @@ +package com.ruoyi.testrocketmq.enums; + + +import lombok.Getter; + +@Getter +public enum MessageCodeEnum { + /** + * 消息模块主题 + */ + MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), + /** + * 系统消息 + */ + NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), + /** + * 用户消息 + */ + USER_MESSAGE("user-message","用户消息服务模块topic名称"), + + /** + * 订单消息 + */ + ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), + + /** + * 平台编号 + */ + USER_MESSAGE_TAG("user_message_tag","用户消息推送"), + NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), + ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), + + /** + * 订单处理编号 + */ + //订单超时处理 + ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); + + + private final String code; + private final String msg; + + MessageCodeEnum(String code, String msg){ + this.code = code; + this.msg = msg; + } + + public static String valuesOfType(String code) { + String value = ""; + for (MessageCodeEnum e : MessageCodeEnum.values()) { + if (code.equals(e.code)) { + value = e.msg; + } + + } + return value; + } + + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java new file mode 100644 index 0000000..e66738a --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ConsumerMode.java @@ -0,0 +1,22 @@ +package com.ruoyi.testrocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +@Data +@Configuration +@Component +public class ConsumerMode { + @Value("${suning.rocketmq.namesrvAddr}") + private String namesrvAddr; + @Value("${suning.rocketmq.conumer.groupName}") + private String groupName ; + @Value("${suning.rocketmq.conumer.consumeThreadMin}") + private int consumeThreadMin; + @Value("${suning.rocketmq.conumer.consumeThreadMax}") + private int consumeThreadMax; + @Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") + private int consumeMessageBatchMaxSize; +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java new file mode 100644 index 0000000..0cd4060 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/model/ProducerMode.java @@ -0,0 +1,25 @@ +package com.ruoyi.testrocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; + +/** + * 生产者初始化 + */ +@RefreshScope +@Data +@Configuration +public class ProducerMode { + @Value("${suning.rocketmq.producer.groupName}") + private String groupName; + @Value("${suning.rocketmq.namesrvAddr}") + private String namesrvAddr; + @Value("${suning.rocketmq.producer.maxMessageSize}") + private Integer maxMessageSize; + @Value("${suning.rocketmq.producer.sendMsgTimeout}") + private Integer sendMsgTimeout; + @Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") + private Integer retryTimesWhenSendFailed; +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java new file mode 100644 index 0000000..86ed05c --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/AsyncProducer.java @@ -0,0 +1,47 @@ +package com.ruoyi.testrocketmq.producer; + +import com.ruoyi.testrocketmq.config.ProducerConfig; +import org.springframework.beans.factory.annotation.Autowired; + +public class AsyncProducer { + + @Autowired + private ProducerConfig producerConfig; + + /** + * 发送异步的消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * @return org.apache.rocketmq.client.producer.SendResult + */ +// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException { +// +// try { +// DefaultMQProducer defaultMQProducer = producerConfig.producer; +// //Create a message instance, specifying topic, tag and message body. +// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET)); +// defaultMQProducer.send(msg, new SendCallback() { +// // 异步回调的处理 +// @Override +// public void onSuccess(SendResult sendResult) { +// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); +// } +// +// @Override +// public void onException(Throwable e) { +// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); +// e.printStackTrace(); +// } +// }); +// } catch (MQClientException e) { +// e.printStackTrace(); +// } catch (RemotingException e) { +// e.printStackTrace(); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// return null; +// } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java new file mode 100644 index 0000000..82d4314 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/ConsumeException.java @@ -0,0 +1,20 @@ +package com.ruoyi.testrocketmq.producer; + +/** + * @author 影子 + */ +public class ConsumeException extends RuntimeException{ + private static final long serialVersionUID = 4093867789628938836L; + + public ConsumeException(String message) { + super(message); + } + + public ConsumeException(Throwable cause) { + super(cause); + } + + public ConsumeException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java new file mode 100644 index 0000000..fa6afbe --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageContext.java @@ -0,0 +1,34 @@ +package com.ruoyi.testrocketmq.producer; + + + +import lombok.Data; +import lombok.ToString; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * 消费时,当前所消费的消息的上下文信息 + * + * @author jolly + */ +@ToString +@Data +public final class MessageContext { + + /** + * 所消费消息所在的消息队列 + * + * @see MessageQueue + */ + private MessageQueue messageQueue; + + /** + * 所消费的消息的扩展属性 + * + * @see MessageExt + */ + private MessageExt messageExt; + + +} diff --git a/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java new file mode 100644 index 0000000..7450530 --- /dev/null +++ b/dk-visual/rocketmq/src/test/java/com/ruoyi/testrocketmq/producer/MessageProducer.java @@ -0,0 +1,110 @@ +package com.ruoyi.testrocketmq.producer; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static com.ruoyi.rocketmq.config.ProducerConfig.producer; + +/** + * 消息发送 + */ +@Slf4j +public class MessageProducer { + + + /** + * 同步发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * @return org.apache.rocketmq.client.producer.SendResult + */ + public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + SendResult result = producer.send(msg); + return result; + } catch (UnsupportedEncodingException e) { + log.error("消息初始化失败!body:{}",body); + + } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { + log.error("消息发送失败! body:{}",body); + } + return null; + } + + + + /** + * 发送有序的消息 + * @param messagesList Message集合 + * @param messageQueueNumber 消息队列编号 + * @return org.apache.rocketmq.client.producer.SendResult + */ + public SendResult sendOrderlyMessage(List messagesList, int messageQueueNumber) { + SendResult result = null; + for (Message message : messagesList) { + try { +// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message); +// System.out.println(defaultMQProducer); + result = producer.send(message, (list, msg, arg) -> { + Integer queueNumber = (Integer) arg; + return list.get(queueNumber); + }, messageQueueNumber); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + log.error("发送有序消息失败"); + return result; + } + } + return result; + } + + /** + * 推送延迟消息 + * @param topic + * @param tag + * @param key + * @return boolean + */ + public SendResult sendDelayMessage(String topic, String tag, String key, String value) + { + SendResult result = null; + try + { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + //设置消息延迟级别,我这里设置5,对应就是延时一分钟 + // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" + msg.setDelayTimeLevel(4); + // 发送消息到一个Broker + result = producer.send(msg); + // 通过sendResult返回消息是否成功送达 + log.info("发送延迟消息结果:======sendResult:{}", result); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("发送时间:{}", format.format(new Date())); + return result; + } + catch (Exception e) + { + e.printStackTrace(); + log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); + } + return result; + } + + +}