68 changed files with 2225 additions and 103 deletions
@ -0,0 +1,10 @@ |
|||
package org.dromara.system.api; |
|||
|
|||
import org.dromara.system.api.domain.vo.RemotePostVo; |
|||
|
|||
import java.util.List; |
|||
|
|||
public interface RemotePostService { |
|||
List<RemotePostVo> listPost(); |
|||
|
|||
} |
@ -0,0 +1,63 @@ |
|||
package org.dromara.system.api.domain.vo; |
|||
|
|||
import com.alibaba.excel.annotation.ExcelProperty; |
|||
import lombok.Data; |
|||
import org.dromara.common.excel.annotation.ExcelDictFormat; |
|||
import org.dromara.common.excel.convert.ExcelDictConvert; |
|||
|
|||
import java.io.Serial; |
|||
import java.util.Date; |
|||
|
|||
@Data |
|||
public class RemotePostVo { |
|||
|
|||
/** |
|||
* 岗位ID |
|||
*/ |
|||
private Long postId; |
|||
|
|||
/** |
|||
* 部门id |
|||
*/ |
|||
private Long deptId; |
|||
|
|||
/** |
|||
* 岗位编码 |
|||
*/ |
|||
private String postCode; |
|||
|
|||
/** |
|||
* 岗位名称 |
|||
*/ |
|||
private String postName; |
|||
|
|||
/** |
|||
* 岗位类别编码 |
|||
*/ |
|||
private String postCategory; |
|||
|
|||
/** |
|||
* 显示顺序 |
|||
*/ |
|||
private Integer postSort; |
|||
|
|||
/** |
|||
* 状态(0正常 1停用) |
|||
*/ |
|||
private String status; |
|||
|
|||
/** |
|||
* 备注 |
|||
*/ |
|||
private String remark; |
|||
|
|||
/** |
|||
* 创建时间 |
|||
*/ |
|||
private Date createTime; |
|||
|
|||
/** |
|||
* 部门名 |
|||
*/ |
|||
private String deptName; |
|||
} |
@ -0,0 +1,26 @@ |
|||
package org.dromara.common.core.constant; |
|||
|
|||
/** |
|||
* @auther yq |
|||
* @data 2025/3/26 |
|||
*/ |
|||
public interface DeviceQrtzConstants { |
|||
|
|||
/** |
|||
* 任务状态 状态(1-使用,2-暂停,0-异常暂停) |
|||
* */ |
|||
int QRTZ_STATUS_0=0; |
|||
int QRTZ_STATUS_1=1; |
|||
int QRTZ_STATUS_2=2; |
|||
|
|||
/** |
|||
* |
|||
* 任务航线状态 状态(1-暂未飞行,2-正在飞行,3-飞行结束,0-异常暂停) |
|||
* */ |
|||
|
|||
int QRTZ_FILE_STATUS_0=0; |
|||
int QRTZ_FILE_STATUS_1=1; |
|||
int QRTZ_FILE_STATUS_2=2; |
|||
int QRTZ_FILE_STATUS_3=3; |
|||
|
|||
} |
@ -0,0 +1,52 @@ |
|||
package org.dromara.stream.listener; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
|||
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; |
|||
|
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; |
|||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; |
|||
import org.apache.rocketmq.common.message.MessageExt; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* @author xbhog |
|||
* @date 2024/06/01 17:05 |
|||
**/ |
|||
@Slf4j |
|||
@Component |
|||
@RocketMQTransactionListener |
|||
public class RocketMQListener { |
|||
|
|||
public static void main(String[] args) throws Exception { |
|||
// 实例化消费者
|
|||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); |
|||
|
|||
// 设置NameServer的地址
|
|||
consumer.setNamesrvAddr("192.168.110.96:9876"); |
|||
|
|||
// 订阅一个或多个Topic,以及Tag来过滤特定消息
|
|||
consumer.subscribe("SELF_TEST_TOPIC", "Tag"); |
|||
|
|||
// 注册回调实现类来处理从broker拉取回来的消息
|
|||
consumer.registerMessageListener(new MessageListenerOrderly() { |
|||
@Override |
|||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { |
|||
context.setAutoCommit(true); // 根据需要设置自动提交偏移量
|
|||
for (MessageExt msg : msgs) { |
|||
// 处理消息内容,例如打印出来或者进行其他业务逻辑处理
|
|||
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); |
|||
} |
|||
return ConsumeOrderlyStatus.SUCCESS; // 返回消费状态
|
|||
} |
|||
}); |
|||
|
|||
// 启动消费者实例
|
|||
consumer.start(); |
|||
// System.out.printf("Consumer Started.%n");
|
|||
} |
|||
|
|||
} |
@ -0,0 +1,49 @@ |
|||
package org.dromara.stream.producer; |
|||
|
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.exception.MQBrokerException; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
|||
|
|||
import org.apache.rocketmq.client.producer.SendResult; |
|||
import org.apache.rocketmq.common.message.Message; |
|||
import org.apache.rocketmq.remoting.exception.RemotingException; |
|||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
import java.nio.charset.StandardCharsets; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* @author xbhog |
|||
* @date 2024/05/25 17:15 |
|||
**/ |
|||
@Slf4j |
|||
@Component |
|||
public class RocketmqProducer { |
|||
|
|||
public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { |
|||
// 初始化消息生产者
|
|||
DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); |
|||
// 设置超时时间
|
|||
producer.setSendMsgTimeout(10000); |
|||
// 指定nameserver地址
|
|||
producer.setNamesrvAddr("192.168.110.96:9876"); |
|||
|
|||
producer.start(); |
|||
for (int i = 0; i < 100; i++) { |
|||
// 创建消息,并指定Topic,Tag和消息体
|
|||
Message msg = new Message("SELF_TEST_TOPIC" /* Topic */, "Tag" /* Tag */, ("Hello RocketMQ " + i).getBytes()); |
|||
// 发送消息到一个Broker
|
|||
producer.send(msg); |
|||
} |
|||
|
|||
// 如果不再发送消息,关闭Producer实例。
|
|||
producer.shutdown(); |
|||
} |
|||
|
|||
} |
|||
|
|||
|
|||
|
@ -0,0 +1,28 @@ |
|||
package org.dromara.system.dubbo; |
|||
|
|||
import lombok.RequiredArgsConstructor; |
|||
import org.apache.dubbo.config.annotation.DubboService; |
|||
import org.dromara.common.core.utils.MapstructUtils; |
|||
import org.dromara.system.api.RemotePostService; |
|||
import org.dromara.system.api.domain.vo.RemotePostVo; |
|||
import org.dromara.system.domain.bo.SysPostBo; |
|||
import org.dromara.system.domain.vo.SysPostVo; |
|||
import org.dromara.system.service.ISysPostService; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
import java.util.List; |
|||
|
|||
@RequiredArgsConstructor |
|||
@Service |
|||
@DubboService |
|||
public class RemotePostServiceImpl implements RemotePostService { |
|||
|
|||
|
|||
private final ISysPostService sysPostService; |
|||
|
|||
@Override |
|||
public List<RemotePostVo> listPost() { |
|||
List<SysPostVo> sysPostVos = sysPostService.selectPostList(new SysPostBo()); |
|||
return MapstructUtils.convert(sysPostVos, RemotePostVo.class); |
|||
} |
|||
} |
@ -0,0 +1,85 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<parent> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>dk-visual</artifactId> |
|||
<version>${revision}</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<artifactId>rocketmq</artifactId> |
|||
|
|||
<dependencies> |
|||
<!--rocketmq消息队列--> |
|||
<dependency> |
|||
<groupId>org.apache.rocketmq</groupId> |
|||
<artifactId>rocketmq-client</artifactId> |
|||
<version> 4.9.0</version> |
|||
</dependency> |
|||
|
|||
<!-- SpringCloud Alibaba Nacos --> |
|||
<dependency> |
|||
<groupId>com.alibaba.cloud</groupId> |
|||
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> |
|||
</dependency> |
|||
|
|||
<!-- SpringCloud Alibaba Nacos Config --> |
|||
<dependency> |
|||
<groupId>com.alibaba.cloud</groupId> |
|||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>com.alibaba.nacos</groupId> |
|||
<artifactId>nacos-client</artifactId> |
|||
</dependency> |
|||
|
|||
<!-- SpringCloud Alibaba Sentinel --> |
|||
<dependency> |
|||
<groupId>com.alibaba.cloud</groupId> |
|||
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> |
|||
</dependency> |
|||
|
|||
<!-- SpringBoot Actuator --> |
|||
<dependency> |
|||
<groupId>org.springframework.boot</groupId> |
|||
<artifactId>spring-boot-starter-actuator</artifactId> |
|||
</dependency> |
|||
|
|||
<!-- Mysql驱动包 --> |
|||
<dependency> |
|||
<groupId>com.mysql</groupId> |
|||
<artifactId>mysql-connector-j</artifactId> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.projectlombok</groupId> |
|||
<artifactId>lombok</artifactId> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.dromara</groupId> |
|||
<artifactId>common-web</artifactId> |
|||
</dependency> |
|||
</dependencies> |
|||
|
|||
<build> |
|||
<finalName>${project.artifactId}</finalName> |
|||
<plugins> |
|||
<plugin> |
|||
<groupId>org.springframework.boot</groupId> |
|||
<artifactId>spring-boot-maven-plugin</artifactId> |
|||
<version>${spring-boot.version}</version> |
|||
<executions> |
|||
<execution> |
|||
<goals> |
|||
<goal>repackage</goal> |
|||
</goals> |
|||
</execution> |
|||
</executions> |
|||
</plugin> |
|||
</plugins> |
|||
</build> |
|||
|
|||
</project> |
@ -0,0 +1,21 @@ |
|||
package com.ruoyi.rocketmq; |
|||
|
|||
import org.springframework.boot.SpringApplication; |
|||
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|||
import org.springframework.scheduling.annotation.EnableAsync; |
|||
|
|||
/** |
|||
* RocketMQ模块 |
|||
* |
|||
* @author |
|||
*/ |
|||
@SpringBootApplication |
|||
@EnableAsync |
|||
public class RocketMQApplication |
|||
{ |
|||
public static void main(String[] args) |
|||
{ |
|||
SpringApplication.run(RocketMQApplication.class, args); |
|||
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); |
|||
} |
|||
} |
@ -0,0 +1,75 @@ |
|||
package com.ruoyi.rocketmq.config; |
|||
|
|||
import com.ruoyi.rocketmq.enums.MessageCodeEnum; |
|||
import com.ruoyi.rocketmq.enums.MessageTopic; |
|||
import com.ruoyi.rocketmq.model.ConsumerMode; |
|||
import com.ruoyi.rocketmq.consumer.RocketMsgListener; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 消费者配置 |
|||
*/ |
|||
@RefreshScope |
|||
@Configuration |
|||
@Slf4j |
|||
public class ConsumerConfig { |
|||
|
|||
@Autowired |
|||
private ConsumerMode consumerMode; |
|||
|
|||
@Bean |
|||
public DefaultMQPushConsumer getRocketMQConsumer() { |
|||
//构建客户端连接
|
|||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); |
|||
//
|
|||
consumer.setNamesrvAddr(consumerMode.getNameServer()); |
|||
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); |
|||
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); |
|||
consumer.registerMessageListener(new RocketMsgListener()); |
|||
/** |
|||
* 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 |
|||
* 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 |
|||
* 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 |
|||
* 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 |
|||
*/ |
|||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
|||
/** |
|||
* CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 |
|||
* 订阅topic整体,从而达到负载均衡的目的 |
|||
* BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 |
|||
* 需要注意的是,在广播模式下,每个Consumer都会独立地处理相同的消息副本。这可能会导致一些潜在的问题,例如消息重复处理或者资源浪费。因此,在使用广播模式时,请确保消息的处理逻辑是幂等的,并仔细考虑系统资源的消耗。 |
|||
*/ |
|||
// consumer.setMessageModel(MessageModel.BROADCASTING);
|
|||
|
|||
consumer.setVipChannelEnabled(false); |
|||
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); |
|||
try { |
|||
/** |
|||
* 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 |
|||
* 由于官方并没有给直接订阅全部消息示例 所以使用list列表循环订阅所有topic |
|||
*/ |
|||
// 获取所有topic列表
|
|||
MessageTopic messageTopic = new MessageTopic(); |
|||
List<String> allTopics = messageTopic.RocketMQTopicList(); |
|||
//订阅所有topic
|
|||
for (String topic : allTopics) { |
|||
consumer.subscribe(topic,"*"); |
|||
} |
|||
consumer.start(); |
|||
log.info("消费者初始化成功:{}", consumer); |
|||
} catch (MQClientException e) { |
|||
e.printStackTrace(); |
|||
log.error("消费者初始化失败:{}",e.getMessage()); |
|||
} |
|||
return consumer; |
|||
} |
|||
} |
@ -0,0 +1,56 @@ |
|||
package com.ruoyi.rocketmq.config; |
|||
|
|||
import com.ruoyi.rocketmq.model.ProducerMode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
|
|||
/** |
|||
* mq搭建地址连接 |
|||
* 生产者初者连接信息 具体看nacos配置 |
|||
*/ |
|||
@Configuration |
|||
@Slf4j |
|||
public class ProducerConfig { |
|||
|
|||
/** |
|||
* 远程调用连接信息 |
|||
*/ |
|||
public static DefaultMQProducer producer; |
|||
|
|||
/** |
|||
* 连接客户端信息配置 具体看nacos配置 |
|||
*/ |
|||
@Autowired |
|||
private ProducerMode producerMode; |
|||
|
|||
@Bean |
|||
public DefaultMQProducer getRocketMQProducer() { |
|||
producer = new DefaultMQProducer(producerMode.getGroupName()); |
|||
producer.setNamesrvAddr(producerMode.getNameServer()); |
|||
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
|
|||
if(producerMode.getMaxMessageSize()!=null){ |
|||
producer.setMaxMessageSize(producerMode.getMaxMessageSize()); |
|||
} |
|||
if(producerMode.getSendMsgTimeout()!=null){ |
|||
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); |
|||
} |
|||
//如果发送消息失败,设置重试次数,默认为2次
|
|||
if(producerMode.getRetryTimesWhenSendFailed()!=null){ |
|||
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); |
|||
} |
|||
producer.setVipChannelEnabled(false); |
|||
try { |
|||
producer.start(); |
|||
log.info("生产者初始化成功:{}",producer.toString()); |
|||
} catch (MQClientException e) { |
|||
log.error("生产者初始化失败:{}",e.getMessage()); |
|||
} |
|||
return producer; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,87 @@ |
|||
package com.ruoyi.rocketmq.consumer; |
|||
|
|||
import com.ruoyi.rocketmq.enums.MessageCodeEnum; |
|||
import com.ruoyi.rocketmq.producer.ConsumeException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
|||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
|||
import org.apache.rocketmq.common.message.MessageExt; |
|||
import org.apache.rocketmq.common.message.MessageQueue; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.util.CollectionUtils; |
|||
|
|||
import java.io.UnsupportedEncodingException; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 消息监听 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class RocketMsgListener implements MessageListenerConcurrently { |
|||
|
|||
/** |
|||
* 消费消息 |
|||
* @param list msgs.size() >= 1 |
|||
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here |
|||
* 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 |
|||
* @param consumeConcurrentlyContext 上下文信息 |
|||
* @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) |
|||
*/ |
|||
@Override |
|||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { |
|||
try{ |
|||
//消息不等于空情况
|
|||
if (!CollectionUtils.isEmpty(list)) { |
|||
//获取topic
|
|||
for (MessageExt messageExt : list) { |
|||
// 解析消息内容
|
|||
String body = new String(messageExt.getBody()); |
|||
log.info("接受到的消息为:{}", body); |
|||
String tags = messageExt.getTags(); |
|||
String topic = messageExt.getTopic(); |
|||
String msgId = messageExt.getMsgId(); |
|||
String keys = messageExt.getKeys(); |
|||
int reConsume = messageExt.getReconsumeTimes(); |
|||
// 消息已经重试了3次,如果不需要再次消费,则返回成功
|
|||
if (reConsume == 3) { |
|||
// TODO 补偿信息
|
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常
|
|||
} |
|||
// 根据不同的topic处理不同的业务 这里以订单消息为例子
|
|||
if (MessageCodeEnum.ORDER_MESSAGE_TOPIC.getCode().equals(topic)) { |
|||
if (MessageCodeEnum.ORDER_MESSAGE_TAG.getCode().equals(tags)) { |
|||
//处理你的业务
|
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
|
|||
} else { |
|||
log.info("未匹配到Tag【{}】" + tags); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
// 消息消费失败
|
|||
//broker会根据设置的messageDelayLevel发起重试,默认16次
|
|||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|||
} catch (Exception e) { |
|||
// 调用 handleException 方法处理异常并返回处理结果
|
|||
return handleException(e); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 异常处理 |
|||
* |
|||
* @param e 捕获的异常 |
|||
* @return 消息消费结果 |
|||
*/ |
|||
private static ConsumeConcurrentlyStatus handleException(final Exception e) { |
|||
Class exceptionClass = e.getClass(); |
|||
if (exceptionClass.equals(UnsupportedEncodingException.class)) { |
|||
log.error(e.getMessage()); |
|||
} else if (exceptionClass.equals(ConsumeException.class)) { |
|||
log.error(e.getMessage()); |
|||
} |
|||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|||
} |
|||
} |
@ -0,0 +1,123 @@ |
|||
package com.ruoyi.rocketmq.controller; |
|||
|
|||
|
|||
import com.ruoyi.rocketmq.producer.MessageProducer; |
|||
import org.apache.rocketmq.client.producer.SendResult; |
|||
import org.apache.rocketmq.common.message.Message; |
|||
import org.springframework.web.bind.annotation.PostMapping; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RequestParam; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
|
|||
/** |
|||
* 消息测试类Controller |
|||
*/ |
|||
@RestController |
|||
@RequestMapping("/api/rocketMessage") |
|||
public class RocketMqController { |
|||
|
|||
|
|||
|
|||
/** |
|||
* 发送同步消息 |
|||
*/ |
|||
@PostMapping("/sendSynchronizeMessage") |
|||
private Map sendSynchronizeMessage(){ |
|||
MessageProducer messageProducer = new MessageProducer(); |
|||
//调用MessageProducer配置好的消息方法
|
|||
SendResult sendResult = messageProducer.sendSynchronizeMessage("order-message","order_message_tag","title","content"); |
|||
Map<String,Object> result = new HashMap<>(); |
|||
result.put("data",sendResult); |
|||
return result; |
|||
} |
|||
|
|||
|
|||
|
|||
/** |
|||
* 发送单向消息 |
|||
*/ |
|||
@PostMapping("/sendOnewayMessage") |
|||
private Map sendOnewayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ |
|||
MessageProducer messageProducer = new MessageProducer(); |
|||
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
|||
messageProducer.sendOnewayMessage("order-message","order_timeout_tag","title","content"); |
|||
Map<String,Object> result = new HashMap<>(); |
|||
result.put("msg","发送成功"); |
|||
result.put("code",200); |
|||
return result; |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 批量发送消息 |
|||
*/ |
|||
@PostMapping("/sendBatchMessage") |
|||
private Map sendBatchMessage(){ |
|||
// 根据实际需求创建消息列表并返回
|
|||
List<Message> messages = new ArrayList<>(); |
|||
// 添加消息到列表
|
|||
messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); |
|||
messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); |
|||
messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); |
|||
MessageProducer messageProducer = new MessageProducer(); |
|||
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
|||
SendResult sendResult = messageProducer.sendBatchMessage(messages); |
|||
Map<String,Object> result = new HashMap<>(); |
|||
result.put("data",sendResult); |
|||
return result; |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 发送有序的消息 |
|||
*/ |
|||
@PostMapping("/sendOrderlyMessage") |
|||
private Map sendOrderlyMessage(){ |
|||
// 根据实际需求创建消息列表并返回
|
|||
List<Message> messages = new ArrayList<>(); |
|||
// 添加消息到列表
|
|||
messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); |
|||
messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); |
|||
messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); |
|||
Integer messageQueueNumber = 3; |
|||
MessageProducer messageProducer = new MessageProducer(); |
|||
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
|||
SendResult sendResult = messageProducer.sendOrderlyMessage(messages,messageQueueNumber); |
|||
Map<String,Object> result = new HashMap<>(); |
|||
result.put("data",sendResult); |
|||
return result; |
|||
} |
|||
|
|||
/** |
|||
* 发送延迟消息 |
|||
*/ |
|||
@PostMapping("/sendDelayMessage") |
|||
private Map sendDelayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ |
|||
MessageProducer messageProducer = new MessageProducer(); |
|||
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
|||
SendResult sendResult = messageProducer.sendDelayMessage("order-message","order_timeout_tag","title","content"); |
|||
Map<String,Object> result = new HashMap<>(); |
|||
result.put("data",sendResult); |
|||
return result; |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 发送异步的消息 |
|||
*/ |
|||
@PostMapping("/sendAsyncProducerMessage") |
|||
private Map sendAsyncProducerMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ |
|||
MessageProducer messageProducer = new MessageProducer(); |
|||
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
|||
SendResult sendResult = messageProducer.sendAsyncProducerMessage("order-message","order_timeout_tag","title","content"); |
|||
Map<String,Object> result = new HashMap<>(); |
|||
result.put("data",sendResult); |
|||
return result; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,55 @@ |
|||
package com.ruoyi.rocketmq.enums; |
|||
|
|||
|
|||
import lombok.Getter; |
|||
|
|||
/** |
|||
* 用于传递topic和 tag |
|||
* 也用于接收消息后判断不同的消息处理不同的业务 |
|||
*/ |
|||
@Getter |
|||
public enum MessageCodeEnum { |
|||
|
|||
/** |
|||
* 系统消息 |
|||
*/ |
|||
NOTE_MESSAGE_TOPIC("system-message","系统消息服务模块topic名称"), |
|||
/** |
|||
* 用户消息 |
|||
*/ |
|||
USER_MESSAGE_TOPIC("user-message","用户消息服务模块topic名称"), |
|||
|
|||
/** |
|||
* 订单消息 |
|||
*/ |
|||
ORDER_MESSAGE_TOPIC("order-message","订单消息服务模块topic名称"), |
|||
|
|||
/** |
|||
* 用户消息tag |
|||
*/ |
|||
USER_MESSAGE_TAG("user_message_tag","用户消息推送"), |
|||
|
|||
/** |
|||
* 系统消息tag |
|||
*/ |
|||
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), |
|||
|
|||
/** |
|||
* 订单消息 |
|||
*/ |
|||
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), |
|||
|
|||
/** |
|||
* 订单处理编号 |
|||
*/ |
|||
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); |
|||
|
|||
private final String code; |
|||
private final String msg; |
|||
|
|||
MessageCodeEnum(String code, String msg){ |
|||
this.code = code; |
|||
this.msg = msg; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,24 @@ |
|||
package com.ruoyi.rocketmq.enums; |
|||
|
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 定义topic列表 |
|||
*/ |
|||
public class MessageTopic { |
|||
|
|||
//在这里添加topic 用于批量订阅
|
|||
public List<String> RocketMQTopicList(){ |
|||
List<String> getTopicLists=new ArrayList<>(); |
|||
//系统消息
|
|||
getTopicLists.add("system-message"); |
|||
//用户消息
|
|||
getTopicLists.add("user-message"); |
|||
//订单消息
|
|||
getTopicLists.add("order-message"); |
|||
return getTopicLists; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,26 @@ |
|||
package com.ruoyi.rocketmq.model; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* 消费者初始化 |
|||
* 消费者连接信息 具体看nacos配置 |
|||
*/ |
|||
@Data |
|||
@Configuration |
|||
@Component |
|||
public class ConsumerMode { |
|||
@Value("${rocketmq.nameServer}") |
|||
private String nameServer; |
|||
@Value("${rocketmq.consumer.groupName}") |
|||
private String groupName ; |
|||
@Value("${rocketmq.consumer.consumerThreadMin}") |
|||
private Integer consumeThreadMin; |
|||
@Value("${rocketmq.consumer.consumerThreadMax}") |
|||
private Integer consumeThreadMax; |
|||
@Value("${rocketmq.consumer.consumerMessageBatchMaxSize}") |
|||
private Integer consumeMessageBatchMaxSize; |
|||
} |
@ -0,0 +1,25 @@ |
|||
package com.ruoyi.rocketmq.model; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* 生产者初始化 |
|||
*/ |
|||
@RefreshScope |
|||
@Data |
|||
@Configuration |
|||
public class ProducerMode { |
|||
@Value("${rocketmq.producer.groupName}") |
|||
private String groupName; |
|||
@Value("${rocketmq.nameServer}") |
|||
private String nameServer; |
|||
@Value("${rocketmq.producer.maxMessageSize}") |
|||
private Integer maxMessageSize; |
|||
@Value("${rocketmq.producer.sendMsgTimeout}") |
|||
private Integer sendMsgTimeout; |
|||
@Value("${rocketmq.producer.retryTimesWhenSendFailed}") |
|||
private Integer retryTimesWhenSendFailed; |
|||
} |
@ -0,0 +1,23 @@ |
|||
package com.ruoyi.rocketmq.producer; |
|||
|
|||
/** |
|||
* @author 影子 |
|||
* 用于捕捉异常非受检异常(unchecked exception) |
|||
* RuntimeException 和其子类的异常在编译时不需要进行强制性的异常处理,可以选择在运行时进行捕获和处理 |
|||
* 可选择使用 |
|||
*/ |
|||
public class ConsumeException extends RuntimeException{ |
|||
private static final long serialVersionUID = 4093867789628938836L; |
|||
|
|||
public ConsumeException(String message) { |
|||
super(message); |
|||
} |
|||
|
|||
public ConsumeException(Throwable cause) { |
|||
super(cause); |
|||
} |
|||
|
|||
public ConsumeException(String message, Throwable cause) { |
|||
super(message, cause); |
|||
} |
|||
} |
@ -0,0 +1,190 @@ |
|||
package com.ruoyi.rocketmq.producer; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.exception.MQBrokerException; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.client.producer.SendCallback; |
|||
import org.apache.rocketmq.client.producer.SendResult; |
|||
import org.apache.rocketmq.client.producer.TransactionMQProducer; |
|||
import org.apache.rocketmq.client.producer.TransactionSendResult; |
|||
import org.apache.rocketmq.common.message.Message; |
|||
import org.apache.rocketmq.remoting.common.RemotingHelper; |
|||
import org.apache.rocketmq.remoting.exception.RemotingException; |
|||
|
|||
import java.io.UnsupportedEncodingException; |
|||
import java.text.DateFormat; |
|||
import java.text.SimpleDateFormat; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
import static com.ruoyi.rocketmq.config.ProducerConfig.producer; |
|||
|
|||
/** |
|||
* 消息发送 |
|||
*/ |
|||
@Slf4j |
|||
public class MessageProducer { |
|||
|
|||
|
|||
/** |
|||
* 同步发送消息 |
|||
* @param topic 主题 |
|||
* @param tag 标签 |
|||
* @param key 自定义的key,根据业务来定 |
|||
* @param value 消息的内容 |
|||
* 通过调用 send() 方法发送消息,阻塞等待服务器响应。 |
|||
*/ |
|||
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ |
|||
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
|||
try { |
|||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|||
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
|||
SendResult result = producer.send(msg); |
|||
return result; |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
log.error("消息初始化失败!body:{}",body); |
|||
|
|||
} |
|||
return null; |
|||
} |
|||
|
|||
/** |
|||
* 单向发送消息 |
|||
* @param topic 主题 |
|||
* @param tag 标签 |
|||
* @param key 自定义的key,根据业务来定 |
|||
* @param value 消息的内容 |
|||
* 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。 |
|||
*/ |
|||
public void sendOnewayMessage(String topic, String tag, String key, String value){ |
|||
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
|||
try { |
|||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|||
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
|||
producer.sendOneway(msg); |
|||
} catch (UnsupportedEncodingException e) { |
|||
log.error("消息初始化失败!body:{}",body); |
|||
|
|||
} catch (MQClientException | InterruptedException | RemotingException e) { |
|||
log.error("消息发送失败! body:{}",body); |
|||
} |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 批量发送消息 |
|||
* @param messages 消息列表 |
|||
* 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。 |
|||
*/ |
|||
public SendResult sendBatchMessage(List<Message> messages){ |
|||
String body = messages.toString(); |
|||
try { |
|||
System.out.println("生产者发送消息:"+ messages); |
|||
// 发送批量消息
|
|||
SendResult sendResult = producer.send(messages); |
|||
return sendResult; |
|||
} catch (MQClientException | InterruptedException | RemotingException e) { |
|||
log.error("消息发送失败! body:{}",body); |
|||
} catch (MQBrokerException e) { |
|||
throw new RuntimeException(e); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 发送有序的消息 |
|||
* @param messagesList Message集合 |
|||
* @param messageQueueNumber 消息队列数量,根据实际情况设定 |
|||
* 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。 |
|||
*/ |
|||
public SendResult sendOrderlyMessage(List<Message> messagesList, Integer messageQueueNumber) { |
|||
SendResult result = null; |
|||
for (Message message : messagesList) { |
|||
try { |
|||
result = producer.send(message, (list, msg, arg) -> { |
|||
Integer queueNumber = (Integer) arg; |
|||
//int queueIndex = queueNumber % list.size();
|
|||
return list.get(queueNumber); |
|||
}, messageQueueNumber);//根据编号取模,选择消息队列
|
|||
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { |
|||
log.error("发送有序消息失败"); |
|||
return result; |
|||
} |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
/** |
|||
* 发送延迟消息 |
|||
* @param topic 主题 |
|||
* @param tag 标签 |
|||
* @param key 自定义的key,根据业务来定 |
|||
* @param value 消息的内容 |
|||
* 延迟发送:通过设置延迟级别来实现延迟发送消息。 |
|||
*/ |
|||
public SendResult sendDelayMessage(String topic, String tag, String key, String value) |
|||
{ |
|||
SendResult result = null; |
|||
try |
|||
{ |
|||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|||
//设置消息延迟级别,我这里设置5,对应就是延时一分钟
|
|||
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
|
|||
msg.setDelayTimeLevel(4); |
|||
// 发送消息到一个Broker
|
|||
result = producer.send(msg); |
|||
// 通过sendResult返回消息是否成功送达
|
|||
log.info("发送延迟消息结果:======sendResult:{}", result); |
|||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|||
log.info("发送时间:{}", format.format(new Date())); |
|||
return result; |
|||
} |
|||
catch (Exception e) |
|||
{ |
|||
e.printStackTrace(); |
|||
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); |
|||
} |
|||
return result; |
|||
} |
|||
/** |
|||
* 发送异步的消息 |
|||
* @param topic 主题 |
|||
* @param tag 标签 |
|||
* @param key 自定义的key,根据业务来定 |
|||
* @param value 消息的内容 |
|||
* 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。 |
|||
*/ |
|||
public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){ |
|||
|
|||
try { |
|||
//创建一个消息实例,指定主题、标签和消息体。
|
|||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|||
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
|||
producer.send(msg,new SendCallback() { |
|||
// 异步回调的处理
|
|||
@Override |
|||
public void onSuccess(SendResult sendResult) { |
|||
System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); |
|||
} |
|||
@Override |
|||
public void onException(Throwable e) { |
|||
System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); |
|||
e.printStackTrace(); |
|||
} |
|||
}); |
|||
} catch (MQClientException e) { |
|||
e.printStackTrace(); |
|||
} catch (RemotingException e) { |
|||
e.printStackTrace(); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} catch (UnsupportedEncodingException e) { |
|||
throw new RuntimeException(e); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,59 @@ |
|||
server: |
|||
port: 9402 |
|||
|
|||
# Spring |
|||
spring: |
|||
application: |
|||
# 应用名称 |
|||
name: rocketmq |
|||
profiles: |
|||
# 环境配置 |
|||
active: @profiles.active@ |
|||
|
|||
--- # rocketmq 配置 |
|||
rocketmq: |
|||
# 生产者的组名 |
|||
producer: |
|||
#是否开启自动配置 |
|||
# isEnable: true |
|||
# 发送同一类消息的设置为同一个group,保证唯一 |
|||
groupName: message-producer |
|||
# 消息最大长度 默认1024*4(4M) |
|||
maxMessageSize: 4096 |
|||
# 发送消息超时时间,默认3000 |
|||
sendMsgTimeout: 3000 |
|||
# 发送消息失败重试次数,默认2 |
|||
retryTimesWhenSendFailed: 3 |
|||
# group: produce-group |
|||
# 消费者的组名 |
|||
consumer: |
|||
#是否开启自动配置 |
|||
# isEnable: true |
|||
# 官方建议:确保同一组中的每个消费者订阅相同的主题。 |
|||
groupName: message-consumer |
|||
consumerThreadMin: 20 |
|||
consumerThreadMax: 64 |
|||
# 设置一次消费消息的条数,默认为1条 |
|||
consumerMessageBatchMaxSize: 1 |
|||
# NameServer地址 |
|||
nameServer: 114.235.183.147:9876 |
|||
|
|||
--- # nacos 配置 |
|||
spring: |
|||
cloud: |
|||
nacos: |
|||
# nacos 服务地址 |
|||
server-addr: @nacos.server@ |
|||
username: @nacos.username@ |
|||
password: @nacos.password@ |
|||
discovery: |
|||
# 注册组 |
|||
group: @nacos.discovery.group@ |
|||
namespace: ${spring.profiles.active} |
|||
config: |
|||
# 配置组 |
|||
group: @nacos.config.group@ |
|||
namespace: ${spring.profiles.active} |
|||
config: |
|||
import: |
|||
- optional:nacos:application-common.yml |
@ -0,0 +1,6 @@ |
|||
___ _ _ __ __ ___ |
|||
| _ \ ___ __ | |__ ___ | |_ | \/ | / _ \ |
|||
| / / _ \ / _| | / / / -_) | _| | |\/| || (_) | |
|||
|_|_\ \___/ \__|_ |_\_\ \___| _\__| |_|__|_| \__\_\ |
|||
_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| |
|||
"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-' |
@ -0,0 +1,28 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<configuration scan="true" scanPeriod="60 seconds" debug="false"> |
|||
<!-- 日志存放路径 --> |
|||
<property name="log.path" value="logs/${project.artifactId}" /> |
|||
<!-- 日志输出格式 --> |
|||
<property name="console.log.pattern" |
|||
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/> |
|||
|
|||
<!-- 控制台输出 --> |
|||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender"> |
|||
<encoder> |
|||
<pattern>${console.log.pattern}</pattern> |
|||
<charset>utf-8</charset> |
|||
</encoder> |
|||
</appender> |
|||
|
|||
<include resource="logback-common.xml" /> |
|||
|
|||
<include resource="logback-logstash.xml" /> |
|||
|
|||
<!-- 开启 skywalking 日志收集 --> |
|||
<include resource="logback-skylog.xml" /> |
|||
|
|||
<!--系统操作日志--> |
|||
<root level="info"> |
|||
<appender-ref ref="console" /> |
|||
</root> |
|||
</configuration> |
@ -0,0 +1,25 @@ |
|||
package com.ruoyi.testrocketmq; |
|||
|
|||
import com.ruoyi.common.security.annotation.EnableCustomConfig; |
|||
import com.ruoyi.common.security.annotation.EnableRyFeignClients; |
|||
import org.springframework.boot.SpringApplication; |
|||
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|||
import org.springframework.scheduling.annotation.EnableAsync; |
|||
|
|||
/** |
|||
* 平台管理模块 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@EnableCustomConfig |
|||
@EnableRyFeignClients |
|||
@SpringBootApplication |
|||
@EnableAsync |
|||
public class RocketMQApplication |
|||
{ |
|||
public static void main(String[] args) |
|||
{ |
|||
SpringApplication.run(RocketMQApplication.class, args); |
|||
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); |
|||
} |
|||
} |
@ -0,0 +1,64 @@ |
|||
package com.ruoyi.testrocketmq.config; |
|||
|
|||
import com.ruoyi.testrocketmq.consumer.RocketMsgListener; |
|||
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
|||
import com.ruoyi.testrocketmq.model.ConsumerMode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* 消费者配置 |
|||
*/ |
|||
@RefreshScope |
|||
@Configuration |
|||
@Slf4j |
|||
public class ConsumerConfig { |
|||
@Autowired |
|||
private ConsumerMode consumerMode; |
|||
|
|||
@Bean |
|||
public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { |
|||
// ConsumerMode consumerMode = new ConsumerMode();
|
|||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); |
|||
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); |
|||
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); |
|||
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); |
|||
consumer.registerMessageListener(new RocketMsgListener()); |
|||
/** |
|||
* 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 |
|||
* 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 |
|||
* 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 |
|||
* 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 |
|||
*/ |
|||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
|||
/** |
|||
* CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 |
|||
* 订阅topic整体,从而达到负载均衡的目的 |
|||
* BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 |
|||
* |
|||
*/ |
|||
// consumer.setMessageModel(MessageModel.BROADCASTING);
|
|||
|
|||
consumer.setVipChannelEnabled(false); |
|||
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); |
|||
try { |
|||
/** |
|||
* 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 |
|||
*/ |
|||
consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); |
|||
consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); |
|||
consumer.start(); |
|||
log.info("消费者初始化成功:{}", consumer.toString()); |
|||
} catch (MQClientException e) { |
|||
e.printStackTrace(); |
|||
log.error("消费者初始化失败:{}",e.getMessage()); |
|||
} |
|||
return consumer; |
|||
} |
|||
} |
@ -0,0 +1,27 @@ |
|||
package com.ruoyi.testrocketmq.config; |
|||
|
|||
/** |
|||
* @author yz |
|||
*/ |
|||
public class MessageConfig { |
|||
private Class<?> messageClass; |
|||
private boolean orderlyMessage; |
|||
|
|||
public Class<?> getMessageClass() { |
|||
return messageClass; |
|||
} |
|||
|
|||
public void setMessageClass(Class<?> messageClass) { |
|||
this.messageClass = messageClass; |
|||
} |
|||
|
|||
public boolean isOrderlyMessage() { |
|||
return orderlyMessage; |
|||
} |
|||
|
|||
public void setOrderlyMessage(boolean orderlyMessage) { |
|||
this.orderlyMessage = orderlyMessage; |
|||
} |
|||
|
|||
|
|||
} |
@ -0,0 +1,48 @@ |
|||
package com.ruoyi.testrocketmq.config; |
|||
|
|||
import com.ruoyi.testrocketmq.model.ProducerMode; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
|
|||
@Configuration |
|||
@Slf4j |
|||
public class ProducerConfig { |
|||
|
|||
public static DefaultMQProducer producer; |
|||
|
|||
@Autowired |
|||
private ProducerMode producerMode; |
|||
|
|||
|
|||
|
|||
@Bean |
|||
public DefaultMQProducer getRocketMQProducer() { |
|||
producer = new DefaultMQProducer(producerMode.getGroupName()); |
|||
producer.setNamesrvAddr(producerMode.getNamesrvAddr()); |
|||
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
|
|||
if(producerMode.getMaxMessageSize()!=null){ |
|||
producer.setMaxMessageSize(producerMode.getMaxMessageSize()); |
|||
} |
|||
if(producerMode.getSendMsgTimeout()!=null){ |
|||
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); |
|||
} |
|||
//如果发送消息失败,设置重试次数,默认为2次
|
|||
if(producerMode.getRetryTimesWhenSendFailed()!=null){ |
|||
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); |
|||
} |
|||
producer.setVipChannelEnabled(false); |
|||
try { |
|||
producer.start(); |
|||
log.info("生产者初始化成功:{}",producer.toString()); |
|||
} catch (MQClientException e) { |
|||
log.error("生产者初始化失败:{}",e.getMessage()); |
|||
} |
|||
return producer; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,100 @@ |
|||
package com.ruoyi.testrocketmq.consumer; |
|||
|
|||
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
|||
import com.ruoyi.testrocketmq.producer.ConsumeException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
|||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
|||
import org.apache.rocketmq.common.message.MessageExt; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.util.CollectionUtils; |
|||
|
|||
import java.io.UnsupportedEncodingException; |
|||
import java.text.DateFormat; |
|||
import java.text.SimpleDateFormat; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 消息监听 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class RocketMsgListener implements MessageListenerConcurrently { |
|||
|
|||
/** |
|||
* 消费消息 |
|||
* |
|||
* @param list msgs.size() >= 1 |
|||
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here |
|||
* 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 |
|||
* @param consumeConcurrentlyContext 上下文信息 |
|||
* @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) |
|||
*/ |
|||
@Override |
|||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { |
|||
|
|||
if (!CollectionUtils.isEmpty(list)) { |
|||
for (MessageExt messageExt : list) { |
|||
// 消息内容
|
|||
String body = new String(messageExt.getBody()); |
|||
log.info("接受到的消息为:{}", body); |
|||
String tags = messageExt.getTags(); |
|||
String topic = messageExt.getTopic(); |
|||
String msgId = messageExt.getMsgId(); |
|||
String keys = messageExt.getKeys(); |
|||
int reConsume = messageExt.getReconsumeTimes(); |
|||
// 消息已经重试了3次,如果不需要再次消费,则返回成功
|
|||
if (reConsume == 3) { |
|||
// TODO 补偿信息
|
|||
//smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败");
|
|||
log.error("消息重试超过3次,消费失败!"); |
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|||
} |
|||
// 订单超时处理
|
|||
if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { |
|||
if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { |
|||
// //获取订单
|
|||
// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys);
|
|||
// if (dealUserOrder != null) {
|
|||
// //订单状态超时未支付关闭订单 处理
|
|||
// if (dealUserOrder.getStatus().equals("1")) {
|
|||
// DealUserOrder dealUserOrders = new DealUserOrder();
|
|||
// dealUserOrders.setOrderId(dealUserOrder.getOrderId());
|
|||
// dealUserOrders.setStatus("4");
|
|||
// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders);
|
|||
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|||
// }
|
|||
// log.info("Order does not exist.");
|
|||
// }
|
|||
log.info("Consumption success:" + body); |
|||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|||
log.info("Consumption time:{}", format.format(new Date())); |
|||
} else { |
|||
log.info("未匹配到Tag【{}】" + tags); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
// 消息消费成功
|
|||
//ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次
|
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|||
} |
|||
|
|||
/** |
|||
* 异常处理 |
|||
* |
|||
* @param e 捕获的异常 |
|||
* @return 消息消费结果 |
|||
*/ |
|||
private static ConsumeConcurrentlyStatus handleException(final Exception e) { |
|||
Class exceptionClass = e.getClass(); |
|||
if (exceptionClass.equals(UnsupportedEncodingException.class)) { |
|||
log.error(e.getMessage()); |
|||
} else if (exceptionClass.equals(ConsumeException.class)) { |
|||
log.error(e.getMessage()); |
|||
} |
|||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|||
} |
|||
} |
@ -0,0 +1,60 @@ |
|||
package com.ruoyi.testrocketmq.enums; |
|||
|
|||
|
|||
import lombok.Getter; |
|||
|
|||
@Getter |
|||
public enum MessageCodeEnum { |
|||
/** |
|||
* 消息模块主题 |
|||
*/ |
|||
MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), |
|||
/** |
|||
* 系统消息 |
|||
*/ |
|||
NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), |
|||
/** |
|||
* 用户消息 |
|||
*/ |
|||
USER_MESSAGE("user-message","用户消息服务模块topic名称"), |
|||
|
|||
/** |
|||
* 订单消息 |
|||
*/ |
|||
ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), |
|||
|
|||
/** |
|||
* 平台编号 |
|||
*/ |
|||
USER_MESSAGE_TAG("user_message_tag","用户消息推送"), |
|||
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), |
|||
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), |
|||
|
|||
/** |
|||
* 订单处理编号 |
|||
*/ |
|||
//订单超时处理
|
|||
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); |
|||
|
|||
|
|||
private final String code; |
|||
private final String msg; |
|||
|
|||
MessageCodeEnum(String code, String msg){ |
|||
this.code = code; |
|||
this.msg = msg; |
|||
} |
|||
|
|||
public static String valuesOfType(String code) { |
|||
String value = ""; |
|||
for (MessageCodeEnum e : MessageCodeEnum.values()) { |
|||
if (code.equals(e.code)) { |
|||
value = e.msg; |
|||
} |
|||
|
|||
} |
|||
return value; |
|||
} |
|||
|
|||
|
|||
} |
@ -0,0 +1,22 @@ |
|||
package com.ruoyi.testrocketmq.model; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Data |
|||
@Configuration |
|||
@Component |
|||
public class ConsumerMode { |
|||
@Value("${suning.rocketmq.namesrvAddr}") |
|||
private String namesrvAddr; |
|||
@Value("${suning.rocketmq.conumer.groupName}") |
|||
private String groupName ; |
|||
@Value("${suning.rocketmq.conumer.consumeThreadMin}") |
|||
private int consumeThreadMin; |
|||
@Value("${suning.rocketmq.conumer.consumeThreadMax}") |
|||
private int consumeThreadMax; |
|||
@Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") |
|||
private int consumeMessageBatchMaxSize; |
|||
} |
@ -0,0 +1,25 @@ |
|||
package com.ruoyi.testrocketmq.model; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* 生产者初始化 |
|||
*/ |
|||
@RefreshScope |
|||
@Data |
|||
@Configuration |
|||
public class ProducerMode { |
|||
@Value("${suning.rocketmq.producer.groupName}") |
|||
private String groupName; |
|||
@Value("${suning.rocketmq.namesrvAddr}") |
|||
private String namesrvAddr; |
|||
@Value("${suning.rocketmq.producer.maxMessageSize}") |
|||
private Integer maxMessageSize; |
|||
@Value("${suning.rocketmq.producer.sendMsgTimeout}") |
|||
private Integer sendMsgTimeout; |
|||
@Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") |
|||
private Integer retryTimesWhenSendFailed; |
|||
} |
@ -0,0 +1,47 @@ |
|||
package com.ruoyi.testrocketmq.producer; |
|||
|
|||
import com.ruoyi.testrocketmq.config.ProducerConfig; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
|
|||
public class AsyncProducer { |
|||
|
|||
@Autowired |
|||
private ProducerConfig producerConfig; |
|||
|
|||
/** |
|||
* 发送异步的消息 |
|||
* @param topic 主题 |
|||
* @param tag 标签 |
|||
* @param key 自定义的key,根据业务来定 |
|||
* @param value 消息的内容 |
|||
* @return org.apache.rocketmq.client.producer.SendResult |
|||
*/ |
|||
// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException {
|
|||
//
|
|||
// try {
|
|||
// DefaultMQProducer defaultMQProducer = producerConfig.producer;
|
|||
// //Create a message instance, specifying topic, tag and message body.
|
|||
// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|||
// defaultMQProducer.send(msg, new SendCallback() {
|
|||
// // 异步回调的处理
|
|||
// @Override
|
|||
// public void onSuccess(SendResult sendResult) {
|
|||
// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId());
|
|||
// }
|
|||
//
|
|||
// @Override
|
|||
// public void onException(Throwable e) {
|
|||
// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e);
|
|||
// e.printStackTrace();
|
|||
// }
|
|||
// });
|
|||
// } catch (MQClientException e) {
|
|||
// e.printStackTrace();
|
|||
// } catch (RemotingException e) {
|
|||
// e.printStackTrace();
|
|||
// } catch (InterruptedException e) {
|
|||
// e.printStackTrace();
|
|||
// }
|
|||
// return null;
|
|||
// }
|
|||
} |
@ -0,0 +1,20 @@ |
|||
package com.ruoyi.testrocketmq.producer; |
|||
|
|||
/** |
|||
* @author 影子 |
|||
*/ |
|||
public class ConsumeException extends RuntimeException{ |
|||
private static final long serialVersionUID = 4093867789628938836L; |
|||
|
|||
public ConsumeException(String message) { |
|||
super(message); |
|||
} |
|||
|
|||
public ConsumeException(Throwable cause) { |
|||
super(cause); |
|||
} |
|||
|
|||
public ConsumeException(String message, Throwable cause) { |
|||
super(message, cause); |
|||
} |
|||
} |
@ -0,0 +1,34 @@ |
|||
package com.ruoyi.testrocketmq.producer; |
|||
|
|||
|
|||
|
|||
import lombok.Data; |
|||
import lombok.ToString; |
|||
import org.apache.rocketmq.common.message.MessageExt; |
|||
import org.apache.rocketmq.common.message.MessageQueue; |
|||
|
|||
/** |
|||
* 消费时,当前所消费的消息的上下文信息 |
|||
* |
|||
* @author jolly |
|||
*/ |
|||
@ToString |
|||
@Data |
|||
public final class MessageContext { |
|||
|
|||
/** |
|||
* 所消费消息所在的消息队列 |
|||
* |
|||
* @see MessageQueue |
|||
*/ |
|||
private MessageQueue messageQueue; |
|||
|
|||
/** |
|||
* 所消费的消息的扩展属性 |
|||
* |
|||
* @see MessageExt |
|||
*/ |
|||
private MessageExt messageExt; |
|||
|
|||
|
|||
} |
@ -0,0 +1,110 @@ |
|||
package com.ruoyi.testrocketmq.producer; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.exception.MQBrokerException; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.client.producer.SendResult; |
|||
import org.apache.rocketmq.common.message.Message; |
|||
import org.apache.rocketmq.remoting.common.RemotingHelper; |
|||
import org.apache.rocketmq.remoting.exception.RemotingException; |
|||
|
|||
import java.io.UnsupportedEncodingException; |
|||
import java.text.DateFormat; |
|||
import java.text.SimpleDateFormat; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
import static com.ruoyi.rocketmq.config.ProducerConfig.producer; |
|||
|
|||
/** |
|||
* 消息发送 |
|||
*/ |
|||
@Slf4j |
|||
public class MessageProducer { |
|||
|
|||
|
|||
/** |
|||
* 同步发送消息 |
|||
* @param topic 主题 |
|||
* @param tag 标签 |
|||
* @param key 自定义的key,根据业务来定 |
|||
* @param value 消息的内容 |
|||
* @return org.apache.rocketmq.client.producer.SendResult |
|||
*/ |
|||
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ |
|||
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
|||
try { |
|||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|||
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
|||
SendResult result = producer.send(msg); |
|||
return result; |
|||
} catch (UnsupportedEncodingException e) { |
|||
log.error("消息初始化失败!body:{}",body); |
|||
|
|||
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { |
|||
log.error("消息发送失败! body:{}",body); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
|
|||
|
|||
/** |
|||
* 发送有序的消息 |
|||
* @param messagesList Message集合 |
|||
* @param messageQueueNumber 消息队列编号 |
|||
* @return org.apache.rocketmq.client.producer.SendResult |
|||
*/ |
|||
public SendResult sendOrderlyMessage(List<Message> messagesList, int messageQueueNumber) { |
|||
SendResult result = null; |
|||
for (Message message : messagesList) { |
|||
try { |
|||
// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message);
|
|||
// System.out.println(defaultMQProducer);
|
|||
result = producer.send(message, (list, msg, arg) -> { |
|||
Integer queueNumber = (Integer) arg; |
|||
return list.get(queueNumber); |
|||
}, messageQueueNumber); |
|||
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { |
|||
log.error("发送有序消息失败"); |
|||
return result; |
|||
} |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
/** |
|||
* 推送延迟消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param key |
|||
* @return boolean |
|||
*/ |
|||
public SendResult sendDelayMessage(String topic, String tag, String key, String value) |
|||
{ |
|||
SendResult result = null; |
|||
try |
|||
{ |
|||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|||
//设置消息延迟级别,我这里设置5,对应就是延时一分钟
|
|||
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
|
|||
msg.setDelayTimeLevel(4); |
|||
// 发送消息到一个Broker
|
|||
result = producer.send(msg); |
|||
// 通过sendResult返回消息是否成功送达
|
|||
log.info("发送延迟消息结果:======sendResult:{}", result); |
|||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|||
log.info("发送时间:{}", format.format(new Date())); |
|||
return result; |
|||
} |
|||
catch (Exception e) |
|||
{ |
|||
e.printStackTrace(); |
|||
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
|
|||
} |
Loading…
Reference in new issue