42 changed files with 1820 additions and 35 deletions
@ -0,0 +1,10 @@ |
|||||
|
package org.dromara.system.api; |
||||
|
|
||||
|
import org.dromara.system.api.domain.vo.RemotePostVo; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
public interface RemotePostService { |
||||
|
List<RemotePostVo> listPost(); |
||||
|
|
||||
|
} |
@ -0,0 +1,63 @@ |
|||||
|
package org.dromara.system.api.domain.vo; |
||||
|
|
||||
|
import com.alibaba.excel.annotation.ExcelProperty; |
||||
|
import lombok.Data; |
||||
|
import org.dromara.common.excel.annotation.ExcelDictFormat; |
||||
|
import org.dromara.common.excel.convert.ExcelDictConvert; |
||||
|
|
||||
|
import java.io.Serial; |
||||
|
import java.util.Date; |
||||
|
|
||||
|
@Data |
||||
|
public class RemotePostVo { |
||||
|
|
||||
|
/** |
||||
|
* 岗位ID |
||||
|
*/ |
||||
|
private Long postId; |
||||
|
|
||||
|
/** |
||||
|
* 部门id |
||||
|
*/ |
||||
|
private Long deptId; |
||||
|
|
||||
|
/** |
||||
|
* 岗位编码 |
||||
|
*/ |
||||
|
private String postCode; |
||||
|
|
||||
|
/** |
||||
|
* 岗位名称 |
||||
|
*/ |
||||
|
private String postName; |
||||
|
|
||||
|
/** |
||||
|
* 岗位类别编码 |
||||
|
*/ |
||||
|
private String postCategory; |
||||
|
|
||||
|
/** |
||||
|
* 显示顺序 |
||||
|
*/ |
||||
|
private Integer postSort; |
||||
|
|
||||
|
/** |
||||
|
* 状态(0正常 1停用) |
||||
|
*/ |
||||
|
private String status; |
||||
|
|
||||
|
/** |
||||
|
* 备注 |
||||
|
*/ |
||||
|
private String remark; |
||||
|
|
||||
|
/** |
||||
|
* 创建时间 |
||||
|
*/ |
||||
|
private Date createTime; |
||||
|
|
||||
|
/** |
||||
|
* 部门名 |
||||
|
*/ |
||||
|
private String deptName; |
||||
|
} |
@ -0,0 +1,28 @@ |
|||||
|
package org.dromara.system.dubbo; |
||||
|
|
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import org.apache.dubbo.config.annotation.DubboService; |
||||
|
import org.dromara.common.core.utils.MapstructUtils; |
||||
|
import org.dromara.system.api.RemotePostService; |
||||
|
import org.dromara.system.api.domain.vo.RemotePostVo; |
||||
|
import org.dromara.system.domain.bo.SysPostBo; |
||||
|
import org.dromara.system.domain.vo.SysPostVo; |
||||
|
import org.dromara.system.service.ISysPostService; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
@RequiredArgsConstructor |
||||
|
@Service |
||||
|
@DubboService |
||||
|
public class RemotePostServiceImpl implements RemotePostService { |
||||
|
|
||||
|
|
||||
|
private final ISysPostService sysPostService; |
||||
|
|
||||
|
@Override |
||||
|
public List<RemotePostVo> listPost() { |
||||
|
List<SysPostVo> sysPostVos = sysPostService.selectPostList(new SysPostBo()); |
||||
|
return MapstructUtils.convert(sysPostVos, RemotePostVo.class); |
||||
|
} |
||||
|
} |
@ -0,0 +1,85 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" |
||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
|
<parent> |
||||
|
<groupId>org.dromara</groupId> |
||||
|
<artifactId>dk-visual</artifactId> |
||||
|
<version>${revision}</version> |
||||
|
</parent> |
||||
|
<modelVersion>4.0.0</modelVersion> |
||||
|
|
||||
|
<artifactId>rocketmq</artifactId> |
||||
|
|
||||
|
<dependencies> |
||||
|
<!--rocketmq消息队列--> |
||||
|
<dependency> |
||||
|
<groupId>org.apache.rocketmq</groupId> |
||||
|
<artifactId>rocketmq-client</artifactId> |
||||
|
<version> 4.9.0</version> |
||||
|
</dependency> |
||||
|
|
||||
|
<!-- SpringCloud Alibaba Nacos --> |
||||
|
<dependency> |
||||
|
<groupId>com.alibaba.cloud</groupId> |
||||
|
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> |
||||
|
</dependency> |
||||
|
|
||||
|
<!-- SpringCloud Alibaba Nacos Config --> |
||||
|
<dependency> |
||||
|
<groupId>com.alibaba.cloud</groupId> |
||||
|
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> |
||||
|
</dependency> |
||||
|
<dependency> |
||||
|
<groupId>com.alibaba.nacos</groupId> |
||||
|
<artifactId>nacos-client</artifactId> |
||||
|
</dependency> |
||||
|
|
||||
|
<!-- SpringCloud Alibaba Sentinel --> |
||||
|
<dependency> |
||||
|
<groupId>com.alibaba.cloud</groupId> |
||||
|
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> |
||||
|
</dependency> |
||||
|
|
||||
|
<!-- SpringBoot Actuator --> |
||||
|
<dependency> |
||||
|
<groupId>org.springframework.boot</groupId> |
||||
|
<artifactId>spring-boot-starter-actuator</artifactId> |
||||
|
</dependency> |
||||
|
|
||||
|
<!-- Mysql驱动包 --> |
||||
|
<dependency> |
||||
|
<groupId>com.mysql</groupId> |
||||
|
<artifactId>mysql-connector-j</artifactId> |
||||
|
</dependency> |
||||
|
|
||||
|
<dependency> |
||||
|
<groupId>org.projectlombok</groupId> |
||||
|
<artifactId>lombok</artifactId> |
||||
|
</dependency> |
||||
|
|
||||
|
<dependency> |
||||
|
<groupId>org.dromara</groupId> |
||||
|
<artifactId>common-web</artifactId> |
||||
|
</dependency> |
||||
|
</dependencies> |
||||
|
|
||||
|
<build> |
||||
|
<finalName>${project.artifactId}</finalName> |
||||
|
<plugins> |
||||
|
<plugin> |
||||
|
<groupId>org.springframework.boot</groupId> |
||||
|
<artifactId>spring-boot-maven-plugin</artifactId> |
||||
|
<version>${spring-boot.version}</version> |
||||
|
<executions> |
||||
|
<execution> |
||||
|
<goals> |
||||
|
<goal>repackage</goal> |
||||
|
</goals> |
||||
|
</execution> |
||||
|
</executions> |
||||
|
</plugin> |
||||
|
</plugins> |
||||
|
</build> |
||||
|
|
||||
|
</project> |
@ -0,0 +1,21 @@ |
|||||
|
package com.ruoyi.rocketmq; |
||||
|
|
||||
|
import org.springframework.boot.SpringApplication; |
||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication; |
||||
|
import org.springframework.scheduling.annotation.EnableAsync; |
||||
|
|
||||
|
/** |
||||
|
* RocketMQ模块 |
||||
|
* |
||||
|
* @author |
||||
|
*/ |
||||
|
@SpringBootApplication |
||||
|
@EnableAsync |
||||
|
public class RocketMQApplication |
||||
|
{ |
||||
|
public static void main(String[] args) |
||||
|
{ |
||||
|
SpringApplication.run(RocketMQApplication.class, args); |
||||
|
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); |
||||
|
} |
||||
|
} |
@ -0,0 +1,75 @@ |
|||||
|
package com.ruoyi.rocketmq.config; |
||||
|
|
||||
|
import com.ruoyi.rocketmq.enums.MessageCodeEnum; |
||||
|
import com.ruoyi.rocketmq.enums.MessageTopic; |
||||
|
import com.ruoyi.rocketmq.model.ConsumerMode; |
||||
|
import com.ruoyi.rocketmq.consumer.RocketMsgListener; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 消费者配置 |
||||
|
*/ |
||||
|
@RefreshScope |
||||
|
@Configuration |
||||
|
@Slf4j |
||||
|
public class ConsumerConfig { |
||||
|
|
||||
|
@Autowired |
||||
|
private ConsumerMode consumerMode; |
||||
|
|
||||
|
@Bean |
||||
|
public DefaultMQPushConsumer getRocketMQConsumer() { |
||||
|
//构建客户端连接
|
||||
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); |
||||
|
//
|
||||
|
consumer.setNamesrvAddr(consumerMode.getNameServer()); |
||||
|
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); |
||||
|
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); |
||||
|
consumer.registerMessageListener(new RocketMsgListener()); |
||||
|
/** |
||||
|
* 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 |
||||
|
* 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 |
||||
|
* 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 |
||||
|
* 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 |
||||
|
*/ |
||||
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
||||
|
/** |
||||
|
* CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 |
||||
|
* 订阅topic整体,从而达到负载均衡的目的 |
||||
|
* BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 |
||||
|
* 需要注意的是,在广播模式下,每个Consumer都会独立地处理相同的消息副本。这可能会导致一些潜在的问题,例如消息重复处理或者资源浪费。因此,在使用广播模式时,请确保消息的处理逻辑是幂等的,并仔细考虑系统资源的消耗。 |
||||
|
*/ |
||||
|
// consumer.setMessageModel(MessageModel.BROADCASTING);
|
||||
|
|
||||
|
consumer.setVipChannelEnabled(false); |
||||
|
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); |
||||
|
try { |
||||
|
/** |
||||
|
* 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 |
||||
|
* 由于官方并没有给直接订阅全部消息示例 所以使用list列表循环订阅所有topic |
||||
|
*/ |
||||
|
// 获取所有topic列表
|
||||
|
MessageTopic messageTopic = new MessageTopic(); |
||||
|
List<String> allTopics = messageTopic.RocketMQTopicList(); |
||||
|
//订阅所有topic
|
||||
|
for (String topic : allTopics) { |
||||
|
consumer.subscribe(topic,"*"); |
||||
|
} |
||||
|
consumer.start(); |
||||
|
log.info("消费者初始化成功:{}", consumer); |
||||
|
} catch (MQClientException e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("消费者初始化失败:{}",e.getMessage()); |
||||
|
} |
||||
|
return consumer; |
||||
|
} |
||||
|
} |
@ -0,0 +1,56 @@ |
|||||
|
package com.ruoyi.rocketmq.config; |
||||
|
|
||||
|
import com.ruoyi.rocketmq.model.ProducerMode; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* mq搭建地址连接 |
||||
|
* 生产者初者连接信息 具体看nacos配置 |
||||
|
*/ |
||||
|
@Configuration |
||||
|
@Slf4j |
||||
|
public class ProducerConfig { |
||||
|
|
||||
|
/** |
||||
|
* 远程调用连接信息 |
||||
|
*/ |
||||
|
public static DefaultMQProducer producer; |
||||
|
|
||||
|
/** |
||||
|
* 连接客户端信息配置 具体看nacos配置 |
||||
|
*/ |
||||
|
@Autowired |
||||
|
private ProducerMode producerMode; |
||||
|
|
||||
|
@Bean |
||||
|
public DefaultMQProducer getRocketMQProducer() { |
||||
|
producer = new DefaultMQProducer(producerMode.getGroupName()); |
||||
|
producer.setNamesrvAddr(producerMode.getNameServer()); |
||||
|
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
|
||||
|
if(producerMode.getMaxMessageSize()!=null){ |
||||
|
producer.setMaxMessageSize(producerMode.getMaxMessageSize()); |
||||
|
} |
||||
|
if(producerMode.getSendMsgTimeout()!=null){ |
||||
|
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); |
||||
|
} |
||||
|
//如果发送消息失败,设置重试次数,默认为2次
|
||||
|
if(producerMode.getRetryTimesWhenSendFailed()!=null){ |
||||
|
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); |
||||
|
} |
||||
|
producer.setVipChannelEnabled(false); |
||||
|
try { |
||||
|
producer.start(); |
||||
|
log.info("生产者初始化成功:{}",producer.toString()); |
||||
|
} catch (MQClientException e) { |
||||
|
log.error("生产者初始化失败:{}",e.getMessage()); |
||||
|
} |
||||
|
return producer; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,87 @@ |
|||||
|
package com.ruoyi.rocketmq.consumer; |
||||
|
|
||||
|
import com.ruoyi.rocketmq.enums.MessageCodeEnum; |
||||
|
import com.ruoyi.rocketmq.producer.ConsumeException; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
||||
|
import org.apache.rocketmq.common.message.MessageExt; |
||||
|
import org.apache.rocketmq.common.message.MessageQueue; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
import org.springframework.util.CollectionUtils; |
||||
|
|
||||
|
import java.io.UnsupportedEncodingException; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 消息监听 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class RocketMsgListener implements MessageListenerConcurrently { |
||||
|
|
||||
|
/** |
||||
|
* 消费消息 |
||||
|
* @param list msgs.size() >= 1 |
||||
|
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here |
||||
|
* 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 |
||||
|
* @param consumeConcurrentlyContext 上下文信息 |
||||
|
* @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) |
||||
|
*/ |
||||
|
@Override |
||||
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { |
||||
|
try{ |
||||
|
//消息不等于空情况
|
||||
|
if (!CollectionUtils.isEmpty(list)) { |
||||
|
//获取topic
|
||||
|
for (MessageExt messageExt : list) { |
||||
|
// 解析消息内容
|
||||
|
String body = new String(messageExt.getBody()); |
||||
|
log.info("接受到的消息为:{}", body); |
||||
|
String tags = messageExt.getTags(); |
||||
|
String topic = messageExt.getTopic(); |
||||
|
String msgId = messageExt.getMsgId(); |
||||
|
String keys = messageExt.getKeys(); |
||||
|
int reConsume = messageExt.getReconsumeTimes(); |
||||
|
// 消息已经重试了3次,如果不需要再次消费,则返回成功
|
||||
|
if (reConsume == 3) { |
||||
|
// TODO 补偿信息
|
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常
|
||||
|
} |
||||
|
// 根据不同的topic处理不同的业务 这里以订单消息为例子
|
||||
|
if (MessageCodeEnum.ORDER_MESSAGE_TOPIC.getCode().equals(topic)) { |
||||
|
if (MessageCodeEnum.ORDER_MESSAGE_TAG.getCode().equals(tags)) { |
||||
|
//处理你的业务
|
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
|
||||
|
} else { |
||||
|
log.info("未匹配到Tag【{}】" + tags); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
// 消息消费失败
|
||||
|
//broker会根据设置的messageDelayLevel发起重试,默认16次
|
||||
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
||||
|
} catch (Exception e) { |
||||
|
// 调用 handleException 方法处理异常并返回处理结果
|
||||
|
return handleException(e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 异常处理 |
||||
|
* |
||||
|
* @param e 捕获的异常 |
||||
|
* @return 消息消费结果 |
||||
|
*/ |
||||
|
private static ConsumeConcurrentlyStatus handleException(final Exception e) { |
||||
|
Class exceptionClass = e.getClass(); |
||||
|
if (exceptionClass.equals(UnsupportedEncodingException.class)) { |
||||
|
log.error(e.getMessage()); |
||||
|
} else if (exceptionClass.equals(ConsumeException.class)) { |
||||
|
log.error(e.getMessage()); |
||||
|
} |
||||
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
||||
|
} |
||||
|
} |
@ -0,0 +1,123 @@ |
|||||
|
package com.ruoyi.rocketmq.controller; |
||||
|
|
||||
|
|
||||
|
import com.ruoyi.rocketmq.producer.MessageProducer; |
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.common.message.Message; |
||||
|
import org.springframework.web.bind.annotation.PostMapping; |
||||
|
import org.springframework.web.bind.annotation.RequestMapping; |
||||
|
import org.springframework.web.bind.annotation.RequestParam; |
||||
|
import org.springframework.web.bind.annotation.RestController; |
||||
|
|
||||
|
import java.util.ArrayList; |
||||
|
import java.util.HashMap; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* 消息测试类Controller |
||||
|
*/ |
||||
|
@RestController |
||||
|
@RequestMapping("/api/rocketMessage") |
||||
|
public class RocketMqController { |
||||
|
|
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送同步消息 |
||||
|
*/ |
||||
|
@PostMapping("/sendSynchronizeMessage") |
||||
|
private Map sendSynchronizeMessage(){ |
||||
|
MessageProducer messageProducer = new MessageProducer(); |
||||
|
//调用MessageProducer配置好的消息方法
|
||||
|
SendResult sendResult = messageProducer.sendSynchronizeMessage("order-message","order_message_tag","title","content"); |
||||
|
Map<String,Object> result = new HashMap<>(); |
||||
|
result.put("data",sendResult); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送单向消息 |
||||
|
*/ |
||||
|
@PostMapping("/sendOnewayMessage") |
||||
|
private Map sendOnewayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ |
||||
|
MessageProducer messageProducer = new MessageProducer(); |
||||
|
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
||||
|
messageProducer.sendOnewayMessage("order-message","order_timeout_tag","title","content"); |
||||
|
Map<String,Object> result = new HashMap<>(); |
||||
|
result.put("msg","发送成功"); |
||||
|
result.put("code",200); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 批量发送消息 |
||||
|
*/ |
||||
|
@PostMapping("/sendBatchMessage") |
||||
|
private Map sendBatchMessage(){ |
||||
|
// 根据实际需求创建消息列表并返回
|
||||
|
List<Message> messages = new ArrayList<>(); |
||||
|
// 添加消息到列表
|
||||
|
messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); |
||||
|
messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); |
||||
|
messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); |
||||
|
MessageProducer messageProducer = new MessageProducer(); |
||||
|
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
||||
|
SendResult sendResult = messageProducer.sendBatchMessage(messages); |
||||
|
Map<String,Object> result = new HashMap<>(); |
||||
|
result.put("data",sendResult); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送有序的消息 |
||||
|
*/ |
||||
|
@PostMapping("/sendOrderlyMessage") |
||||
|
private Map sendOrderlyMessage(){ |
||||
|
// 根据实际需求创建消息列表并返回
|
||||
|
List<Message> messages = new ArrayList<>(); |
||||
|
// 添加消息到列表
|
||||
|
messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes())); |
||||
|
messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes())); |
||||
|
messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes())); |
||||
|
Integer messageQueueNumber = 3; |
||||
|
MessageProducer messageProducer = new MessageProducer(); |
||||
|
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
||||
|
SendResult sendResult = messageProducer.sendOrderlyMessage(messages,messageQueueNumber); |
||||
|
Map<String,Object> result = new HashMap<>(); |
||||
|
result.put("data",sendResult); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送延迟消息 |
||||
|
*/ |
||||
|
@PostMapping("/sendDelayMessage") |
||||
|
private Map sendDelayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ |
||||
|
MessageProducer messageProducer = new MessageProducer(); |
||||
|
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
||||
|
SendResult sendResult = messageProducer.sendDelayMessage("order-message","order_timeout_tag","title","content"); |
||||
|
Map<String,Object> result = new HashMap<>(); |
||||
|
result.put("data",sendResult); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送异步的消息 |
||||
|
*/ |
||||
|
@PostMapping("/sendAsyncProducerMessage") |
||||
|
private Map sendAsyncProducerMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){ |
||||
|
MessageProducer messageProducer = new MessageProducer(); |
||||
|
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
|
||||
|
SendResult sendResult = messageProducer.sendAsyncProducerMessage("order-message","order_timeout_tag","title","content"); |
||||
|
Map<String,Object> result = new HashMap<>(); |
||||
|
result.put("data",sendResult); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,55 @@ |
|||||
|
package com.ruoyi.rocketmq.enums; |
||||
|
|
||||
|
|
||||
|
import lombok.Getter; |
||||
|
|
||||
|
/** |
||||
|
* 用于传递topic和 tag |
||||
|
* 也用于接收消息后判断不同的消息处理不同的业务 |
||||
|
*/ |
||||
|
@Getter |
||||
|
public enum MessageCodeEnum { |
||||
|
|
||||
|
/** |
||||
|
* 系统消息 |
||||
|
*/ |
||||
|
NOTE_MESSAGE_TOPIC("system-message","系统消息服务模块topic名称"), |
||||
|
/** |
||||
|
* 用户消息 |
||||
|
*/ |
||||
|
USER_MESSAGE_TOPIC("user-message","用户消息服务模块topic名称"), |
||||
|
|
||||
|
/** |
||||
|
* 订单消息 |
||||
|
*/ |
||||
|
ORDER_MESSAGE_TOPIC("order-message","订单消息服务模块topic名称"), |
||||
|
|
||||
|
/** |
||||
|
* 用户消息tag |
||||
|
*/ |
||||
|
USER_MESSAGE_TAG("user_message_tag","用户消息推送"), |
||||
|
|
||||
|
/** |
||||
|
* 系统消息tag |
||||
|
*/ |
||||
|
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), |
||||
|
|
||||
|
/** |
||||
|
* 订单消息 |
||||
|
*/ |
||||
|
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), |
||||
|
|
||||
|
/** |
||||
|
* 订单处理编号 |
||||
|
*/ |
||||
|
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); |
||||
|
|
||||
|
private final String code; |
||||
|
private final String msg; |
||||
|
|
||||
|
MessageCodeEnum(String code, String msg){ |
||||
|
this.code = code; |
||||
|
this.msg = msg; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,24 @@ |
|||||
|
package com.ruoyi.rocketmq.enums; |
||||
|
|
||||
|
|
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 定义topic列表 |
||||
|
*/ |
||||
|
public class MessageTopic { |
||||
|
|
||||
|
//在这里添加topic 用于批量订阅
|
||||
|
public List<String> RocketMQTopicList(){ |
||||
|
List<String> getTopicLists=new ArrayList<>(); |
||||
|
//系统消息
|
||||
|
getTopicLists.add("system-message"); |
||||
|
//用户消息
|
||||
|
getTopicLists.add("user-message"); |
||||
|
//订单消息
|
||||
|
getTopicLists.add("order-message"); |
||||
|
return getTopicLists; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,26 @@ |
|||||
|
package com.ruoyi.rocketmq.model; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 消费者初始化 |
||||
|
* 消费者连接信息 具体看nacos配置 |
||||
|
*/ |
||||
|
@Data |
||||
|
@Configuration |
||||
|
@Component |
||||
|
public class ConsumerMode { |
||||
|
@Value("${rocketmq.nameServer}") |
||||
|
private String nameServer; |
||||
|
@Value("${rocketmq.consumer.groupName}") |
||||
|
private String groupName ; |
||||
|
@Value("${rocketmq.consumer.consumerThreadMin}") |
||||
|
private Integer consumeThreadMin; |
||||
|
@Value("${rocketmq.consumer.consumerThreadMax}") |
||||
|
private Integer consumeThreadMax; |
||||
|
@Value("${rocketmq.consumer.consumerMessageBatchMaxSize}") |
||||
|
private Integer consumeMessageBatchMaxSize; |
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
package com.ruoyi.rocketmq.model; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
/** |
||||
|
* 生产者初始化 |
||||
|
*/ |
||||
|
@RefreshScope |
||||
|
@Data |
||||
|
@Configuration |
||||
|
public class ProducerMode { |
||||
|
@Value("${rocketmq.producer.groupName}") |
||||
|
private String groupName; |
||||
|
@Value("${rocketmq.nameServer}") |
||||
|
private String nameServer; |
||||
|
@Value("${rocketmq.producer.maxMessageSize}") |
||||
|
private Integer maxMessageSize; |
||||
|
@Value("${rocketmq.producer.sendMsgTimeout}") |
||||
|
private Integer sendMsgTimeout; |
||||
|
@Value("${rocketmq.producer.retryTimesWhenSendFailed}") |
||||
|
private Integer retryTimesWhenSendFailed; |
||||
|
} |
@ -0,0 +1,23 @@ |
|||||
|
package com.ruoyi.rocketmq.producer; |
||||
|
|
||||
|
/** |
||||
|
* @author 影子 |
||||
|
* 用于捕捉异常非受检异常(unchecked exception) |
||||
|
* RuntimeException 和其子类的异常在编译时不需要进行强制性的异常处理,可以选择在运行时进行捕获和处理 |
||||
|
* 可选择使用 |
||||
|
*/ |
||||
|
public class ConsumeException extends RuntimeException{ |
||||
|
private static final long serialVersionUID = 4093867789628938836L; |
||||
|
|
||||
|
public ConsumeException(String message) { |
||||
|
super(message); |
||||
|
} |
||||
|
|
||||
|
public ConsumeException(Throwable cause) { |
||||
|
super(cause); |
||||
|
} |
||||
|
|
||||
|
public ConsumeException(String message, Throwable cause) { |
||||
|
super(message, cause); |
||||
|
} |
||||
|
} |
@ -0,0 +1,190 @@ |
|||||
|
package com.ruoyi.rocketmq.producer; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.exception.MQBrokerException; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.client.producer.SendCallback; |
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.client.producer.TransactionMQProducer; |
||||
|
import org.apache.rocketmq.client.producer.TransactionSendResult; |
||||
|
import org.apache.rocketmq.common.message.Message; |
||||
|
import org.apache.rocketmq.remoting.common.RemotingHelper; |
||||
|
import org.apache.rocketmq.remoting.exception.RemotingException; |
||||
|
|
||||
|
import java.io.UnsupportedEncodingException; |
||||
|
import java.text.DateFormat; |
||||
|
import java.text.SimpleDateFormat; |
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
|
||||
|
import static com.ruoyi.rocketmq.config.ProducerConfig.producer; |
||||
|
|
||||
|
/** |
||||
|
* 消息发送 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public class MessageProducer { |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 同步发送消息 |
||||
|
* @param topic 主题 |
||||
|
* @param tag 标签 |
||||
|
* @param key 自定义的key,根据业务来定 |
||||
|
* @param value 消息的内容 |
||||
|
* 通过调用 send() 方法发送消息,阻塞等待服务器响应。 |
||||
|
*/ |
||||
|
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ |
||||
|
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
||||
|
try { |
||||
|
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
||||
|
SendResult result = producer.send(msg); |
||||
|
return result; |
||||
|
} catch (Exception e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("消息初始化失败!body:{}",body); |
||||
|
|
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 单向发送消息 |
||||
|
* @param topic 主题 |
||||
|
* @param tag 标签 |
||||
|
* @param key 自定义的key,根据业务来定 |
||||
|
* @param value 消息的内容 |
||||
|
* 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。 |
||||
|
*/ |
||||
|
public void sendOnewayMessage(String topic, String tag, String key, String value){ |
||||
|
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
||||
|
try { |
||||
|
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
||||
|
producer.sendOneway(msg); |
||||
|
} catch (UnsupportedEncodingException e) { |
||||
|
log.error("消息初始化失败!body:{}",body); |
||||
|
|
||||
|
} catch (MQClientException | InterruptedException | RemotingException e) { |
||||
|
log.error("消息发送失败! body:{}",body); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 批量发送消息 |
||||
|
* @param messages 消息列表 |
||||
|
* 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。 |
||||
|
*/ |
||||
|
public SendResult sendBatchMessage(List<Message> messages){ |
||||
|
String body = messages.toString(); |
||||
|
try { |
||||
|
System.out.println("生产者发送消息:"+ messages); |
||||
|
// 发送批量消息
|
||||
|
SendResult sendResult = producer.send(messages); |
||||
|
return sendResult; |
||||
|
} catch (MQClientException | InterruptedException | RemotingException e) { |
||||
|
log.error("消息发送失败! body:{}",body); |
||||
|
} catch (MQBrokerException e) { |
||||
|
throw new RuntimeException(e); |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送有序的消息 |
||||
|
* @param messagesList Message集合 |
||||
|
* @param messageQueueNumber 消息队列数量,根据实际情况设定 |
||||
|
* 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。 |
||||
|
*/ |
||||
|
public SendResult sendOrderlyMessage(List<Message> messagesList, Integer messageQueueNumber) { |
||||
|
SendResult result = null; |
||||
|
for (Message message : messagesList) { |
||||
|
try { |
||||
|
result = producer.send(message, (list, msg, arg) -> { |
||||
|
Integer queueNumber = (Integer) arg; |
||||
|
//int queueIndex = queueNumber % list.size();
|
||||
|
return list.get(queueNumber); |
||||
|
}, messageQueueNumber);//根据编号取模,选择消息队列
|
||||
|
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { |
||||
|
log.error("发送有序消息失败"); |
||||
|
return result; |
||||
|
} |
||||
|
} |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送延迟消息 |
||||
|
* @param topic 主题 |
||||
|
* @param tag 标签 |
||||
|
* @param key 自定义的key,根据业务来定 |
||||
|
* @param value 消息的内容 |
||||
|
* 延迟发送:通过设置延迟级别来实现延迟发送消息。 |
||||
|
*/ |
||||
|
public SendResult sendDelayMessage(String topic, String tag, String key, String value) |
||||
|
{ |
||||
|
SendResult result = null; |
||||
|
try |
||||
|
{ |
||||
|
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
//设置消息延迟级别,我这里设置5,对应就是延时一分钟
|
||||
|
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
|
||||
|
msg.setDelayTimeLevel(4); |
||||
|
// 发送消息到一个Broker
|
||||
|
result = producer.send(msg); |
||||
|
// 通过sendResult返回消息是否成功送达
|
||||
|
log.info("发送延迟消息结果:======sendResult:{}", result); |
||||
|
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
||||
|
log.info("发送时间:{}", format.format(new Date())); |
||||
|
return result; |
||||
|
} |
||||
|
catch (Exception e) |
||||
|
{ |
||||
|
e.printStackTrace(); |
||||
|
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); |
||||
|
} |
||||
|
return result; |
||||
|
} |
||||
|
/** |
||||
|
* 发送异步的消息 |
||||
|
* @param topic 主题 |
||||
|
* @param tag 标签 |
||||
|
* @param key 自定义的key,根据业务来定 |
||||
|
* @param value 消息的内容 |
||||
|
* 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。 |
||||
|
*/ |
||||
|
public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){ |
||||
|
|
||||
|
try { |
||||
|
//创建一个消息实例,指定主题、标签和消息体。
|
||||
|
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
||||
|
producer.send(msg,new SendCallback() { |
||||
|
// 异步回调的处理
|
||||
|
@Override |
||||
|
public void onSuccess(SendResult sendResult) { |
||||
|
System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); |
||||
|
} |
||||
|
@Override |
||||
|
public void onException(Throwable e) { |
||||
|
System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); |
||||
|
e.printStackTrace(); |
||||
|
} |
||||
|
}); |
||||
|
} catch (MQClientException e) { |
||||
|
e.printStackTrace(); |
||||
|
} catch (RemotingException e) { |
||||
|
e.printStackTrace(); |
||||
|
} catch (InterruptedException e) { |
||||
|
e.printStackTrace(); |
||||
|
} catch (UnsupportedEncodingException e) { |
||||
|
throw new RuntimeException(e); |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,59 @@ |
|||||
|
server: |
||||
|
port: 9402 |
||||
|
|
||||
|
# Spring |
||||
|
spring: |
||||
|
application: |
||||
|
# 应用名称 |
||||
|
name: rocketmq |
||||
|
profiles: |
||||
|
# 环境配置 |
||||
|
active: @profiles.active@ |
||||
|
|
||||
|
--- # rocketmq 配置 |
||||
|
rocketmq: |
||||
|
# 生产者的组名 |
||||
|
producer: |
||||
|
#是否开启自动配置 |
||||
|
# isEnable: true |
||||
|
# 发送同一类消息的设置为同一个group,保证唯一 |
||||
|
groupName: message-producer |
||||
|
# 消息最大长度 默认1024*4(4M) |
||||
|
maxMessageSize: 4096 |
||||
|
# 发送消息超时时间,默认3000 |
||||
|
sendMsgTimeout: 3000 |
||||
|
# 发送消息失败重试次数,默认2 |
||||
|
retryTimesWhenSendFailed: 3 |
||||
|
# group: produce-group |
||||
|
# 消费者的组名 |
||||
|
consumer: |
||||
|
#是否开启自动配置 |
||||
|
# isEnable: true |
||||
|
# 官方建议:确保同一组中的每个消费者订阅相同的主题。 |
||||
|
groupName: message-consumer |
||||
|
consumerThreadMin: 20 |
||||
|
consumerThreadMax: 64 |
||||
|
# 设置一次消费消息的条数,默认为1条 |
||||
|
consumerMessageBatchMaxSize: 1 |
||||
|
# NameServer地址 |
||||
|
nameServer: 114.235.183.147:9876 |
||||
|
|
||||
|
--- # nacos 配置 |
||||
|
spring: |
||||
|
cloud: |
||||
|
nacos: |
||||
|
# nacos 服务地址 |
||||
|
server-addr: @nacos.server@ |
||||
|
username: @nacos.username@ |
||||
|
password: @nacos.password@ |
||||
|
discovery: |
||||
|
# 注册组 |
||||
|
group: @nacos.discovery.group@ |
||||
|
namespace: ${spring.profiles.active} |
||||
|
config: |
||||
|
# 配置组 |
||||
|
group: @nacos.config.group@ |
||||
|
namespace: ${spring.profiles.active} |
||||
|
config: |
||||
|
import: |
||||
|
- optional:nacos:application-common.yml |
@ -0,0 +1,6 @@ |
|||||
|
___ _ _ __ __ ___ |
||||
|
| _ \ ___ __ | |__ ___ | |_ | \/ | / _ \ |
||||
|
| / / _ \ / _| | / / / -_) | _| | |\/| || (_) | |
||||
|
|_|_\ \___/ \__|_ |_\_\ \___| _\__| |_|__|_| \__\_\ |
||||
|
_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""| |
||||
|
"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-' |
@ -0,0 +1,28 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
<configuration scan="true" scanPeriod="60 seconds" debug="false"> |
||||
|
<!-- 日志存放路径 --> |
||||
|
<property name="log.path" value="logs/${project.artifactId}" /> |
||||
|
<!-- 日志输出格式 --> |
||||
|
<property name="console.log.pattern" |
||||
|
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/> |
||||
|
|
||||
|
<!-- 控制台输出 --> |
||||
|
<appender name="console" class="ch.qos.logback.core.ConsoleAppender"> |
||||
|
<encoder> |
||||
|
<pattern>${console.log.pattern}</pattern> |
||||
|
<charset>utf-8</charset> |
||||
|
</encoder> |
||||
|
</appender> |
||||
|
|
||||
|
<include resource="logback-common.xml" /> |
||||
|
|
||||
|
<include resource="logback-logstash.xml" /> |
||||
|
|
||||
|
<!-- 开启 skywalking 日志收集 --> |
||||
|
<include resource="logback-skylog.xml" /> |
||||
|
|
||||
|
<!--系统操作日志--> |
||||
|
<root level="info"> |
||||
|
<appender-ref ref="console" /> |
||||
|
</root> |
||||
|
</configuration> |
@ -0,0 +1,25 @@ |
|||||
|
package com.ruoyi.testrocketmq; |
||||
|
|
||||
|
import com.ruoyi.common.security.annotation.EnableCustomConfig; |
||||
|
import com.ruoyi.common.security.annotation.EnableRyFeignClients; |
||||
|
import org.springframework.boot.SpringApplication; |
||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication; |
||||
|
import org.springframework.scheduling.annotation.EnableAsync; |
||||
|
|
||||
|
/** |
||||
|
* 平台管理模块 |
||||
|
* |
||||
|
* @author ruoyi |
||||
|
*/ |
||||
|
@EnableCustomConfig |
||||
|
@EnableRyFeignClients |
||||
|
@SpringBootApplication |
||||
|
@EnableAsync |
||||
|
public class RocketMQApplication |
||||
|
{ |
||||
|
public static void main(String[] args) |
||||
|
{ |
||||
|
SpringApplication.run(RocketMQApplication.class, args); |
||||
|
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); |
||||
|
} |
||||
|
} |
@ -0,0 +1,64 @@ |
|||||
|
package com.ruoyi.testrocketmq.config; |
||||
|
|
||||
|
import com.ruoyi.testrocketmq.consumer.RocketMsgListener; |
||||
|
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
||||
|
import com.ruoyi.testrocketmq.model.ConsumerMode; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
/** |
||||
|
* 消费者配置 |
||||
|
*/ |
||||
|
@RefreshScope |
||||
|
@Configuration |
||||
|
@Slf4j |
||||
|
public class ConsumerConfig { |
||||
|
@Autowired |
||||
|
private ConsumerMode consumerMode; |
||||
|
|
||||
|
@Bean |
||||
|
public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { |
||||
|
// ConsumerMode consumerMode = new ConsumerMode();
|
||||
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); |
||||
|
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); |
||||
|
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); |
||||
|
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); |
||||
|
consumer.registerMessageListener(new RocketMsgListener()); |
||||
|
/** |
||||
|
* 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 |
||||
|
* 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 |
||||
|
* 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 |
||||
|
* 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 |
||||
|
*/ |
||||
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
||||
|
/** |
||||
|
* CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 |
||||
|
* 订阅topic整体,从而达到负载均衡的目的 |
||||
|
* BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 |
||||
|
* |
||||
|
*/ |
||||
|
// consumer.setMessageModel(MessageModel.BROADCASTING);
|
||||
|
|
||||
|
consumer.setVipChannelEnabled(false); |
||||
|
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); |
||||
|
try { |
||||
|
/** |
||||
|
* 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 |
||||
|
*/ |
||||
|
consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); |
||||
|
consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); |
||||
|
consumer.start(); |
||||
|
log.info("消费者初始化成功:{}", consumer.toString()); |
||||
|
} catch (MQClientException e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("消费者初始化失败:{}",e.getMessage()); |
||||
|
} |
||||
|
return consumer; |
||||
|
} |
||||
|
} |
@ -0,0 +1,27 @@ |
|||||
|
package com.ruoyi.testrocketmq.config; |
||||
|
|
||||
|
/** |
||||
|
* @author yz |
||||
|
*/ |
||||
|
public class MessageConfig { |
||||
|
private Class<?> messageClass; |
||||
|
private boolean orderlyMessage; |
||||
|
|
||||
|
public Class<?> getMessageClass() { |
||||
|
return messageClass; |
||||
|
} |
||||
|
|
||||
|
public void setMessageClass(Class<?> messageClass) { |
||||
|
this.messageClass = messageClass; |
||||
|
} |
||||
|
|
||||
|
public boolean isOrderlyMessage() { |
||||
|
return orderlyMessage; |
||||
|
} |
||||
|
|
||||
|
public void setOrderlyMessage(boolean orderlyMessage) { |
||||
|
this.orderlyMessage = orderlyMessage; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
@ -0,0 +1,48 @@ |
|||||
|
package com.ruoyi.testrocketmq.config; |
||||
|
|
||||
|
import com.ruoyi.testrocketmq.model.ProducerMode; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
|
||||
|
@Configuration |
||||
|
@Slf4j |
||||
|
public class ProducerConfig { |
||||
|
|
||||
|
public static DefaultMQProducer producer; |
||||
|
|
||||
|
@Autowired |
||||
|
private ProducerMode producerMode; |
||||
|
|
||||
|
|
||||
|
|
||||
|
@Bean |
||||
|
public DefaultMQProducer getRocketMQProducer() { |
||||
|
producer = new DefaultMQProducer(producerMode.getGroupName()); |
||||
|
producer.setNamesrvAddr(producerMode.getNamesrvAddr()); |
||||
|
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
|
||||
|
if(producerMode.getMaxMessageSize()!=null){ |
||||
|
producer.setMaxMessageSize(producerMode.getMaxMessageSize()); |
||||
|
} |
||||
|
if(producerMode.getSendMsgTimeout()!=null){ |
||||
|
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); |
||||
|
} |
||||
|
//如果发送消息失败,设置重试次数,默认为2次
|
||||
|
if(producerMode.getRetryTimesWhenSendFailed()!=null){ |
||||
|
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); |
||||
|
} |
||||
|
producer.setVipChannelEnabled(false); |
||||
|
try { |
||||
|
producer.start(); |
||||
|
log.info("生产者初始化成功:{}",producer.toString()); |
||||
|
} catch (MQClientException e) { |
||||
|
log.error("生产者初始化失败:{}",e.getMessage()); |
||||
|
} |
||||
|
return producer; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,100 @@ |
|||||
|
package com.ruoyi.testrocketmq.consumer; |
||||
|
|
||||
|
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
||||
|
import com.ruoyi.testrocketmq.producer.ConsumeException; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
||||
|
import org.apache.rocketmq.common.message.MessageExt; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
import org.springframework.util.CollectionUtils; |
||||
|
|
||||
|
import java.io.UnsupportedEncodingException; |
||||
|
import java.text.DateFormat; |
||||
|
import java.text.SimpleDateFormat; |
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* 消息监听 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class RocketMsgListener implements MessageListenerConcurrently { |
||||
|
|
||||
|
/** |
||||
|
* 消费消息 |
||||
|
* |
||||
|
* @param list msgs.size() >= 1 |
||||
|
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here |
||||
|
* 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 |
||||
|
* @param consumeConcurrentlyContext 上下文信息 |
||||
|
* @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) |
||||
|
*/ |
||||
|
@Override |
||||
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { |
||||
|
|
||||
|
if (!CollectionUtils.isEmpty(list)) { |
||||
|
for (MessageExt messageExt : list) { |
||||
|
// 消息内容
|
||||
|
String body = new String(messageExt.getBody()); |
||||
|
log.info("接受到的消息为:{}", body); |
||||
|
String tags = messageExt.getTags(); |
||||
|
String topic = messageExt.getTopic(); |
||||
|
String msgId = messageExt.getMsgId(); |
||||
|
String keys = messageExt.getKeys(); |
||||
|
int reConsume = messageExt.getReconsumeTimes(); |
||||
|
// 消息已经重试了3次,如果不需要再次消费,则返回成功
|
||||
|
if (reConsume == 3) { |
||||
|
// TODO 补偿信息
|
||||
|
//smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败");
|
||||
|
log.error("消息重试超过3次,消费失败!"); |
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
||||
|
} |
||||
|
// 订单超时处理
|
||||
|
if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { |
||||
|
if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { |
||||
|
// //获取订单
|
||||
|
// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys);
|
||||
|
// if (dealUserOrder != null) {
|
||||
|
// //订单状态超时未支付关闭订单 处理
|
||||
|
// if (dealUserOrder.getStatus().equals("1")) {
|
||||
|
// DealUserOrder dealUserOrders = new DealUserOrder();
|
||||
|
// dealUserOrders.setOrderId(dealUserOrder.getOrderId());
|
||||
|
// dealUserOrders.setStatus("4");
|
||||
|
// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders);
|
||||
|
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
|
// }
|
||||
|
// log.info("Order does not exist.");
|
||||
|
// }
|
||||
|
log.info("Consumption success:" + body); |
||||
|
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
||||
|
log.info("Consumption time:{}", format.format(new Date())); |
||||
|
} else { |
||||
|
log.info("未匹配到Tag【{}】" + tags); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
// 消息消费成功
|
||||
|
//ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次
|
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 异常处理 |
||||
|
* |
||||
|
* @param e 捕获的异常 |
||||
|
* @return 消息消费结果 |
||||
|
*/ |
||||
|
private static ConsumeConcurrentlyStatus handleException(final Exception e) { |
||||
|
Class exceptionClass = e.getClass(); |
||||
|
if (exceptionClass.equals(UnsupportedEncodingException.class)) { |
||||
|
log.error(e.getMessage()); |
||||
|
} else if (exceptionClass.equals(ConsumeException.class)) { |
||||
|
log.error(e.getMessage()); |
||||
|
} |
||||
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
||||
|
} |
||||
|
} |
@ -0,0 +1,60 @@ |
|||||
|
package com.ruoyi.testrocketmq.enums; |
||||
|
|
||||
|
|
||||
|
import lombok.Getter; |
||||
|
|
||||
|
@Getter |
||||
|
public enum MessageCodeEnum { |
||||
|
/** |
||||
|
* 消息模块主题 |
||||
|
*/ |
||||
|
MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), |
||||
|
/** |
||||
|
* 系统消息 |
||||
|
*/ |
||||
|
NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), |
||||
|
/** |
||||
|
* 用户消息 |
||||
|
*/ |
||||
|
USER_MESSAGE("user-message","用户消息服务模块topic名称"), |
||||
|
|
||||
|
/** |
||||
|
* 订单消息 |
||||
|
*/ |
||||
|
ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), |
||||
|
|
||||
|
/** |
||||
|
* 平台编号 |
||||
|
*/ |
||||
|
USER_MESSAGE_TAG("user_message_tag","用户消息推送"), |
||||
|
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), |
||||
|
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), |
||||
|
|
||||
|
/** |
||||
|
* 订单处理编号 |
||||
|
*/ |
||||
|
//订单超时处理
|
||||
|
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); |
||||
|
|
||||
|
|
||||
|
private final String code; |
||||
|
private final String msg; |
||||
|
|
||||
|
MessageCodeEnum(String code, String msg){ |
||||
|
this.code = code; |
||||
|
this.msg = msg; |
||||
|
} |
||||
|
|
||||
|
public static String valuesOfType(String code) { |
||||
|
String value = ""; |
||||
|
for (MessageCodeEnum e : MessageCodeEnum.values()) { |
||||
|
if (code.equals(e.code)) { |
||||
|
value = e.msg; |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
return value; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
@ -0,0 +1,22 @@ |
|||||
|
package com.ruoyi.testrocketmq.model; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
@Data |
||||
|
@Configuration |
||||
|
@Component |
||||
|
public class ConsumerMode { |
||||
|
@Value("${suning.rocketmq.namesrvAddr}") |
||||
|
private String namesrvAddr; |
||||
|
@Value("${suning.rocketmq.conumer.groupName}") |
||||
|
private String groupName ; |
||||
|
@Value("${suning.rocketmq.conumer.consumeThreadMin}") |
||||
|
private int consumeThreadMin; |
||||
|
@Value("${suning.rocketmq.conumer.consumeThreadMax}") |
||||
|
private int consumeThreadMax; |
||||
|
@Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") |
||||
|
private int consumeMessageBatchMaxSize; |
||||
|
} |
@ -0,0 +1,25 @@ |
|||||
|
package com.ruoyi.testrocketmq.model; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
/** |
||||
|
* 生产者初始化 |
||||
|
*/ |
||||
|
@RefreshScope |
||||
|
@Data |
||||
|
@Configuration |
||||
|
public class ProducerMode { |
||||
|
@Value("${suning.rocketmq.producer.groupName}") |
||||
|
private String groupName; |
||||
|
@Value("${suning.rocketmq.namesrvAddr}") |
||||
|
private String namesrvAddr; |
||||
|
@Value("${suning.rocketmq.producer.maxMessageSize}") |
||||
|
private Integer maxMessageSize; |
||||
|
@Value("${suning.rocketmq.producer.sendMsgTimeout}") |
||||
|
private Integer sendMsgTimeout; |
||||
|
@Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") |
||||
|
private Integer retryTimesWhenSendFailed; |
||||
|
} |
@ -0,0 +1,47 @@ |
|||||
|
package com.ruoyi.testrocketmq.producer; |
||||
|
|
||||
|
import com.ruoyi.testrocketmq.config.ProducerConfig; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
|
||||
|
public class AsyncProducer { |
||||
|
|
||||
|
@Autowired |
||||
|
private ProducerConfig producerConfig; |
||||
|
|
||||
|
/** |
||||
|
* 发送异步的消息 |
||||
|
* @param topic 主题 |
||||
|
* @param tag 标签 |
||||
|
* @param key 自定义的key,根据业务来定 |
||||
|
* @param value 消息的内容 |
||||
|
* @return org.apache.rocketmq.client.producer.SendResult |
||||
|
*/ |
||||
|
// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException {
|
||||
|
//
|
||||
|
// try {
|
||||
|
// DefaultMQProducer defaultMQProducer = producerConfig.producer;
|
||||
|
// //Create a message instance, specifying topic, tag and message body.
|
||||
|
// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET));
|
||||
|
// defaultMQProducer.send(msg, new SendCallback() {
|
||||
|
// // 异步回调的处理
|
||||
|
// @Override
|
||||
|
// public void onSuccess(SendResult sendResult) {
|
||||
|
// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId());
|
||||
|
// }
|
||||
|
//
|
||||
|
// @Override
|
||||
|
// public void onException(Throwable e) {
|
||||
|
// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e);
|
||||
|
// e.printStackTrace();
|
||||
|
// }
|
||||
|
// });
|
||||
|
// } catch (MQClientException e) {
|
||||
|
// e.printStackTrace();
|
||||
|
// } catch (RemotingException e) {
|
||||
|
// e.printStackTrace();
|
||||
|
// } catch (InterruptedException e) {
|
||||
|
// e.printStackTrace();
|
||||
|
// }
|
||||
|
// return null;
|
||||
|
// }
|
||||
|
} |
@ -0,0 +1,20 @@ |
|||||
|
package com.ruoyi.testrocketmq.producer; |
||||
|
|
||||
|
/** |
||||
|
* @author 影子 |
||||
|
*/ |
||||
|
public class ConsumeException extends RuntimeException{ |
||||
|
private static final long serialVersionUID = 4093867789628938836L; |
||||
|
|
||||
|
public ConsumeException(String message) { |
||||
|
super(message); |
||||
|
} |
||||
|
|
||||
|
public ConsumeException(Throwable cause) { |
||||
|
super(cause); |
||||
|
} |
||||
|
|
||||
|
public ConsumeException(String message, Throwable cause) { |
||||
|
super(message, cause); |
||||
|
} |
||||
|
} |
@ -0,0 +1,34 @@ |
|||||
|
package com.ruoyi.testrocketmq.producer; |
||||
|
|
||||
|
|
||||
|
|
||||
|
import lombok.Data; |
||||
|
import lombok.ToString; |
||||
|
import org.apache.rocketmq.common.message.MessageExt; |
||||
|
import org.apache.rocketmq.common.message.MessageQueue; |
||||
|
|
||||
|
/** |
||||
|
* 消费时,当前所消费的消息的上下文信息 |
||||
|
* |
||||
|
* @author jolly |
||||
|
*/ |
||||
|
@ToString |
||||
|
@Data |
||||
|
public final class MessageContext { |
||||
|
|
||||
|
/** |
||||
|
* 所消费消息所在的消息队列 |
||||
|
* |
||||
|
* @see MessageQueue |
||||
|
*/ |
||||
|
private MessageQueue messageQueue; |
||||
|
|
||||
|
/** |
||||
|
* 所消费的消息的扩展属性 |
||||
|
* |
||||
|
* @see MessageExt |
||||
|
*/ |
||||
|
private MessageExt messageExt; |
||||
|
|
||||
|
|
||||
|
} |
@ -0,0 +1,110 @@ |
|||||
|
package com.ruoyi.testrocketmq.producer; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.exception.MQBrokerException; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.common.message.Message; |
||||
|
import org.apache.rocketmq.remoting.common.RemotingHelper; |
||||
|
import org.apache.rocketmq.remoting.exception.RemotingException; |
||||
|
|
||||
|
import java.io.UnsupportedEncodingException; |
||||
|
import java.text.DateFormat; |
||||
|
import java.text.SimpleDateFormat; |
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
|
||||
|
import static com.ruoyi.rocketmq.config.ProducerConfig.producer; |
||||
|
|
||||
|
/** |
||||
|
* 消息发送 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public class MessageProducer { |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 同步发送消息 |
||||
|
* @param topic 主题 |
||||
|
* @param tag 标签 |
||||
|
* @param key 自定义的key,根据业务来定 |
||||
|
* @param value 消息的内容 |
||||
|
* @return org.apache.rocketmq.client.producer.SendResult |
||||
|
*/ |
||||
|
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ |
||||
|
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
||||
|
try { |
||||
|
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
||||
|
SendResult result = producer.send(msg); |
||||
|
return result; |
||||
|
} catch (UnsupportedEncodingException e) { |
||||
|
log.error("消息初始化失败!body:{}",body); |
||||
|
|
||||
|
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { |
||||
|
log.error("消息发送失败! body:{}",body); |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 发送有序的消息 |
||||
|
* @param messagesList Message集合 |
||||
|
* @param messageQueueNumber 消息队列编号 |
||||
|
* @return org.apache.rocketmq.client.producer.SendResult |
||||
|
*/ |
||||
|
public SendResult sendOrderlyMessage(List<Message> messagesList, int messageQueueNumber) { |
||||
|
SendResult result = null; |
||||
|
for (Message message : messagesList) { |
||||
|
try { |
||||
|
// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message);
|
||||
|
// System.out.println(defaultMQProducer);
|
||||
|
result = producer.send(message, (list, msg, arg) -> { |
||||
|
Integer queueNumber = (Integer) arg; |
||||
|
return list.get(queueNumber); |
||||
|
}, messageQueueNumber); |
||||
|
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { |
||||
|
log.error("发送有序消息失败"); |
||||
|
return result; |
||||
|
} |
||||
|
} |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 推送延迟消息 |
||||
|
* @param topic |
||||
|
* @param tag |
||||
|
* @param key |
||||
|
* @return boolean |
||||
|
*/ |
||||
|
public SendResult sendDelayMessage(String topic, String tag, String key, String value) |
||||
|
{ |
||||
|
SendResult result = null; |
||||
|
try |
||||
|
{ |
||||
|
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
||||
|
//设置消息延迟级别,我这里设置5,对应就是延时一分钟
|
||||
|
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
|
||||
|
msg.setDelayTimeLevel(4); |
||||
|
// 发送消息到一个Broker
|
||||
|
result = producer.send(msg); |
||||
|
// 通过sendResult返回消息是否成功送达
|
||||
|
log.info("发送延迟消息结果:======sendResult:{}", result); |
||||
|
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
||||
|
log.info("发送时间:{}", format.format(new Date())); |
||||
|
return result; |
||||
|
} |
||||
|
catch (Exception e) |
||||
|
{ |
||||
|
e.printStackTrace(); |
||||
|
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); |
||||
|
} |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
Loading…
Reference in new issue