28 changed files with 132 additions and 614 deletions
@ -1,25 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq; |
|
||||
|
|
||||
import com.ruoyi.common.security.annotation.EnableCustomConfig; |
|
||||
import com.ruoyi.common.security.annotation.EnableRyFeignClients; |
|
||||
import org.springframework.boot.SpringApplication; |
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|
||||
import org.springframework.scheduling.annotation.EnableAsync; |
|
||||
|
|
||||
/** |
|
||||
* 平台管理模块 |
|
||||
* |
|
||||
* @author ruoyi |
|
||||
*/ |
|
||||
@EnableCustomConfig |
|
||||
@EnableRyFeignClients |
|
||||
@SpringBootApplication |
|
||||
@EnableAsync |
|
||||
public class RocketMQApplication |
|
||||
{ |
|
||||
public static void main(String[] args) |
|
||||
{ |
|
||||
SpringApplication.run(RocketMQApplication.class, args); |
|
||||
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); |
|
||||
} |
|
||||
} |
|
@ -1,64 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.config; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.consumer.RocketMsgListener; |
|
||||
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
|
||||
import com.ruoyi.testrocketmq.model.ConsumerMode; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
|
||||
import org.apache.rocketmq.client.exception.MQClientException; |
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|
||||
import org.springframework.context.annotation.Bean; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
/** |
|
||||
* 消费者配置 |
|
||||
*/ |
|
||||
@RefreshScope |
|
||||
@Configuration |
|
||||
@Slf4j |
|
||||
public class ConsumerConfig { |
|
||||
@Autowired |
|
||||
private ConsumerMode consumerMode; |
|
||||
|
|
||||
@Bean |
|
||||
public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { |
|
||||
// ConsumerMode consumerMode = new ConsumerMode();
|
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); |
|
||||
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); |
|
||||
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); |
|
||||
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); |
|
||||
consumer.registerMessageListener(new RocketMsgListener()); |
|
||||
/** |
|
||||
* 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 |
|
||||
* 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 |
|
||||
* 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 |
|
||||
* 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 |
|
||||
*/ |
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
|
||||
/** |
|
||||
* CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 |
|
||||
* 订阅topic整体,从而达到负载均衡的目的 |
|
||||
* BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 |
|
||||
* |
|
||||
*/ |
|
||||
// consumer.setMessageModel(MessageModel.BROADCASTING);
|
|
||||
|
|
||||
consumer.setVipChannelEnabled(false); |
|
||||
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); |
|
||||
try { |
|
||||
/** |
|
||||
* 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 |
|
||||
*/ |
|
||||
consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); |
|
||||
consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); |
|
||||
consumer.start(); |
|
||||
log.info("消费者初始化成功:{}", consumer.toString()); |
|
||||
} catch (MQClientException e) { |
|
||||
e.printStackTrace(); |
|
||||
log.error("消费者初始化失败:{}",e.getMessage()); |
|
||||
} |
|
||||
return consumer; |
|
||||
} |
|
||||
} |
|
@ -1,27 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.config; |
|
||||
|
|
||||
/** |
|
||||
* @author yz |
|
||||
*/ |
|
||||
public class MessageConfig { |
|
||||
private Class<?> messageClass; |
|
||||
private boolean orderlyMessage; |
|
||||
|
|
||||
public Class<?> getMessageClass() { |
|
||||
return messageClass; |
|
||||
} |
|
||||
|
|
||||
public void setMessageClass(Class<?> messageClass) { |
|
||||
this.messageClass = messageClass; |
|
||||
} |
|
||||
|
|
||||
public boolean isOrderlyMessage() { |
|
||||
return orderlyMessage; |
|
||||
} |
|
||||
|
|
||||
public void setOrderlyMessage(boolean orderlyMessage) { |
|
||||
this.orderlyMessage = orderlyMessage; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
} |
|
@ -1,48 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.config; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.model.ProducerMode; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.exception.MQClientException; |
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
import org.springframework.context.annotation.Bean; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
|
|
||||
@Configuration |
|
||||
@Slf4j |
|
||||
public class ProducerConfig { |
|
||||
|
|
||||
public static DefaultMQProducer producer; |
|
||||
|
|
||||
@Autowired |
|
||||
private ProducerMode producerMode; |
|
||||
|
|
||||
|
|
||||
|
|
||||
@Bean |
|
||||
public DefaultMQProducer getRocketMQProducer() { |
|
||||
producer = new DefaultMQProducer(producerMode.getGroupName()); |
|
||||
producer.setNamesrvAddr(producerMode.getNamesrvAddr()); |
|
||||
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
|
|
||||
if(producerMode.getMaxMessageSize()!=null){ |
|
||||
producer.setMaxMessageSize(producerMode.getMaxMessageSize()); |
|
||||
} |
|
||||
if(producerMode.getSendMsgTimeout()!=null){ |
|
||||
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); |
|
||||
} |
|
||||
//如果发送消息失败,设置重试次数,默认为2次
|
|
||||
if(producerMode.getRetryTimesWhenSendFailed()!=null){ |
|
||||
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); |
|
||||
} |
|
||||
producer.setVipChannelEnabled(false); |
|
||||
try { |
|
||||
producer.start(); |
|
||||
log.info("生产者初始化成功:{}",producer.toString()); |
|
||||
} catch (MQClientException e) { |
|
||||
log.error("生产者初始化失败:{}",e.getMessage()); |
|
||||
} |
|
||||
return producer; |
|
||||
} |
|
||||
|
|
||||
} |
|
@ -1,100 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.consumer; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
|
||||
import com.ruoyi.testrocketmq.producer.ConsumeException; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
|
||||
import org.apache.rocketmq.common.message.MessageExt; |
|
||||
import org.springframework.stereotype.Component; |
|
||||
import org.springframework.util.CollectionUtils; |
|
||||
|
|
||||
import java.io.UnsupportedEncodingException; |
|
||||
import java.text.DateFormat; |
|
||||
import java.text.SimpleDateFormat; |
|
||||
import java.util.Date; |
|
||||
import java.util.List; |
|
||||
|
|
||||
/** |
|
||||
* 消息监听 |
|
||||
*/ |
|
||||
@Slf4j |
|
||||
@Component |
|
||||
public class RocketMsgListener implements MessageListenerConcurrently { |
|
||||
|
|
||||
/** |
|
||||
* 消费消息 |
|
||||
* |
|
||||
* @param list msgs.size() >= 1 |
|
||||
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here |
|
||||
* 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 |
|
||||
* @param consumeConcurrentlyContext 上下文信息 |
|
||||
* @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) |
|
||||
*/ |
|
||||
@Override |
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { |
|
||||
|
|
||||
if (!CollectionUtils.isEmpty(list)) { |
|
||||
for (MessageExt messageExt : list) { |
|
||||
// 消息内容
|
|
||||
String body = new String(messageExt.getBody()); |
|
||||
log.info("接受到的消息为:{}", body); |
|
||||
String tags = messageExt.getTags(); |
|
||||
String topic = messageExt.getTopic(); |
|
||||
String msgId = messageExt.getMsgId(); |
|
||||
String keys = messageExt.getKeys(); |
|
||||
int reConsume = messageExt.getReconsumeTimes(); |
|
||||
// 消息已经重试了3次,如果不需要再次消费,则返回成功
|
|
||||
if (reConsume == 3) { |
|
||||
// TODO 补偿信息
|
|
||||
//smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败");
|
|
||||
log.error("消息重试超过3次,消费失败!"); |
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
||||
} |
|
||||
// 订单超时处理
|
|
||||
if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { |
|
||||
if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { |
|
||||
// //获取订单
|
|
||||
// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys);
|
|
||||
// if (dealUserOrder != null) {
|
|
||||
// //订单状态超时未支付关闭订单 处理
|
|
||||
// if (dealUserOrder.getStatus().equals("1")) {
|
|
||||
// DealUserOrder dealUserOrders = new DealUserOrder();
|
|
||||
// dealUserOrders.setOrderId(dealUserOrder.getOrderId());
|
|
||||
// dealUserOrders.setStatus("4");
|
|
||||
// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders);
|
|
||||
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
||||
// }
|
|
||||
// log.info("Order does not exist.");
|
|
||||
// }
|
|
||||
log.info("Consumption success:" + body); |
|
||||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
||||
log.info("Consumption time:{}", format.format(new Date())); |
|
||||
} else { |
|
||||
log.info("未匹配到Tag【{}】" + tags); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
// 消息消费成功
|
|
||||
//ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次
|
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* 异常处理 |
|
||||
* |
|
||||
* @param e 捕获的异常 |
|
||||
* @return 消息消费结果 |
|
||||
*/ |
|
||||
private static ConsumeConcurrentlyStatus handleException(final Exception e) { |
|
||||
Class exceptionClass = e.getClass(); |
|
||||
if (exceptionClass.equals(UnsupportedEncodingException.class)) { |
|
||||
log.error(e.getMessage()); |
|
||||
} else if (exceptionClass.equals(ConsumeException.class)) { |
|
||||
log.error(e.getMessage()); |
|
||||
} |
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|
||||
} |
|
||||
} |
|
@ -1,60 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.enums; |
|
||||
|
|
||||
|
|
||||
import lombok.Getter; |
|
||||
|
|
||||
@Getter |
|
||||
public enum MessageCodeEnum { |
|
||||
/** |
|
||||
* 消息模块主题 |
|
||||
*/ |
|
||||
MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), |
|
||||
/** |
|
||||
* 系统消息 |
|
||||
*/ |
|
||||
NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), |
|
||||
/** |
|
||||
* 用户消息 |
|
||||
*/ |
|
||||
USER_MESSAGE("user-message","用户消息服务模块topic名称"), |
|
||||
|
|
||||
/** |
|
||||
* 订单消息 |
|
||||
*/ |
|
||||
ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), |
|
||||
|
|
||||
/** |
|
||||
* 平台编号 |
|
||||
*/ |
|
||||
USER_MESSAGE_TAG("user_message_tag","用户消息推送"), |
|
||||
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), |
|
||||
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), |
|
||||
|
|
||||
/** |
|
||||
* 订单处理编号 |
|
||||
*/ |
|
||||
//订单超时处理
|
|
||||
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); |
|
||||
|
|
||||
|
|
||||
private final String code; |
|
||||
private final String msg; |
|
||||
|
|
||||
MessageCodeEnum(String code, String msg){ |
|
||||
this.code = code; |
|
||||
this.msg = msg; |
|
||||
} |
|
||||
|
|
||||
public static String valuesOfType(String code) { |
|
||||
String value = ""; |
|
||||
for (MessageCodeEnum e : MessageCodeEnum.values()) { |
|
||||
if (code.equals(e.code)) { |
|
||||
value = e.msg; |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
return value; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
} |
|
@ -1,22 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.model; |
|
||||
|
|
||||
import lombok.Data; |
|
||||
import org.springframework.beans.factory.annotation.Value; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
import org.springframework.stereotype.Component; |
|
||||
|
|
||||
@Data |
|
||||
@Configuration |
|
||||
@Component |
|
||||
public class ConsumerMode { |
|
||||
@Value("${suning.rocketmq.namesrvAddr}") |
|
||||
private String namesrvAddr; |
|
||||
@Value("${suning.rocketmq.conumer.groupName}") |
|
||||
private String groupName ; |
|
||||
@Value("${suning.rocketmq.conumer.consumeThreadMin}") |
|
||||
private int consumeThreadMin; |
|
||||
@Value("${suning.rocketmq.conumer.consumeThreadMax}") |
|
||||
private int consumeThreadMax; |
|
||||
@Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") |
|
||||
private int consumeMessageBatchMaxSize; |
|
||||
} |
|
@ -1,25 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.model; |
|
||||
|
|
||||
import lombok.Data; |
|
||||
import org.springframework.beans.factory.annotation.Value; |
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
/** |
|
||||
* 生产者初始化 |
|
||||
*/ |
|
||||
@RefreshScope |
|
||||
@Data |
|
||||
@Configuration |
|
||||
public class ProducerMode { |
|
||||
@Value("${suning.rocketmq.producer.groupName}") |
|
||||
private String groupName; |
|
||||
@Value("${suning.rocketmq.namesrvAddr}") |
|
||||
private String namesrvAddr; |
|
||||
@Value("${suning.rocketmq.producer.maxMessageSize}") |
|
||||
private Integer maxMessageSize; |
|
||||
@Value("${suning.rocketmq.producer.sendMsgTimeout}") |
|
||||
private Integer sendMsgTimeout; |
|
||||
@Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") |
|
||||
private Integer retryTimesWhenSendFailed; |
|
||||
} |
|
@ -1,47 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.config.ProducerConfig; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
|
|
||||
public class AsyncProducer { |
|
||||
|
|
||||
@Autowired |
|
||||
private ProducerConfig producerConfig; |
|
||||
|
|
||||
/** |
|
||||
* 发送异步的消息 |
|
||||
* @param topic 主题 |
|
||||
* @param tag 标签 |
|
||||
* @param key 自定义的key,根据业务来定 |
|
||||
* @param value 消息的内容 |
|
||||
* @return org.apache.rocketmq.client.producer.SendResult |
|
||||
*/ |
|
||||
// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException {
|
|
||||
//
|
|
||||
// try {
|
|
||||
// DefaultMQProducer defaultMQProducer = producerConfig.producer;
|
|
||||
// //Create a message instance, specifying topic, tag and message body.
|
|
||||
// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
||||
// defaultMQProducer.send(msg, new SendCallback() {
|
|
||||
// // 异步回调的处理
|
|
||||
// @Override
|
|
||||
// public void onSuccess(SendResult sendResult) {
|
|
||||
// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId());
|
|
||||
// }
|
|
||||
//
|
|
||||
// @Override
|
|
||||
// public void onException(Throwable e) {
|
|
||||
// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e);
|
|
||||
// e.printStackTrace();
|
|
||||
// }
|
|
||||
// });
|
|
||||
// } catch (MQClientException e) {
|
|
||||
// e.printStackTrace();
|
|
||||
// } catch (RemotingException e) {
|
|
||||
// e.printStackTrace();
|
|
||||
// } catch (InterruptedException e) {
|
|
||||
// e.printStackTrace();
|
|
||||
// }
|
|
||||
// return null;
|
|
||||
// }
|
|
||||
} |
|
@ -1,20 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
/** |
|
||||
* @author 影子 |
|
||||
*/ |
|
||||
public class ConsumeException extends RuntimeException{ |
|
||||
private static final long serialVersionUID = 4093867789628938836L; |
|
||||
|
|
||||
public ConsumeException(String message) { |
|
||||
super(message); |
|
||||
} |
|
||||
|
|
||||
public ConsumeException(Throwable cause) { |
|
||||
super(cause); |
|
||||
} |
|
||||
|
|
||||
public ConsumeException(String message, Throwable cause) { |
|
||||
super(message, cause); |
|
||||
} |
|
||||
} |
|
@ -1,34 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
|
|
||||
|
|
||||
import lombok.Data; |
|
||||
import lombok.ToString; |
|
||||
import org.apache.rocketmq.common.message.MessageExt; |
|
||||
import org.apache.rocketmq.common.message.MessageQueue; |
|
||||
|
|
||||
/** |
|
||||
* 消费时,当前所消费的消息的上下文信息 |
|
||||
* |
|
||||
* @author jolly |
|
||||
*/ |
|
||||
@ToString |
|
||||
@Data |
|
||||
public final class MessageContext { |
|
||||
|
|
||||
/** |
|
||||
* 所消费消息所在的消息队列 |
|
||||
* |
|
||||
* @see MessageQueue |
|
||||
*/ |
|
||||
private MessageQueue messageQueue; |
|
||||
|
|
||||
/** |
|
||||
* 所消费的消息的扩展属性 |
|
||||
* |
|
||||
* @see MessageExt |
|
||||
*/ |
|
||||
private MessageExt messageExt; |
|
||||
|
|
||||
|
|
||||
} |
|
@ -1,110 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
import com.alibaba.fastjson.JSON; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException; |
|
||||
import org.apache.rocketmq.client.exception.MQClientException; |
|
||||
import org.apache.rocketmq.client.producer.SendResult; |
|
||||
import org.apache.rocketmq.common.message.Message; |
|
||||
import org.apache.rocketmq.remoting.common.RemotingHelper; |
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException; |
|
||||
|
|
||||
import java.io.UnsupportedEncodingException; |
|
||||
import java.text.DateFormat; |
|
||||
import java.text.SimpleDateFormat; |
|
||||
import java.util.Date; |
|
||||
import java.util.List; |
|
||||
|
|
||||
import static com.ruoyi.rocketmq.config.ProducerConfig.producer; |
|
||||
|
|
||||
/** |
|
||||
* 消息发送 |
|
||||
*/ |
|
||||
@Slf4j |
|
||||
public class MessageProducer { |
|
||||
|
|
||||
|
|
||||
/** |
|
||||
* 同步发送消息 |
|
||||
* @param topic 主题 |
|
||||
* @param tag 标签 |
|
||||
* @param key 自定义的key,根据业务来定 |
|
||||
* @param value 消息的内容 |
|
||||
* @return org.apache.rocketmq.client.producer.SendResult |
|
||||
*/ |
|
||||
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ |
|
||||
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
|
||||
try { |
|
||||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|
||||
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
|
||||
SendResult result = producer.send(msg); |
|
||||
return result; |
|
||||
} catch (UnsupportedEncodingException e) { |
|
||||
log.error("消息初始化失败!body:{}",body); |
|
||||
|
|
||||
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { |
|
||||
log.error("消息发送失败! body:{}",body); |
|
||||
} |
|
||||
return null; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
|
|
||||
/** |
|
||||
* 发送有序的消息 |
|
||||
* @param messagesList Message集合 |
|
||||
* @param messageQueueNumber 消息队列编号 |
|
||||
* @return org.apache.rocketmq.client.producer.SendResult |
|
||||
*/ |
|
||||
public SendResult sendOrderlyMessage(List<Message> messagesList, int messageQueueNumber) { |
|
||||
SendResult result = null; |
|
||||
for (Message message : messagesList) { |
|
||||
try { |
|
||||
// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message);
|
|
||||
// System.out.println(defaultMQProducer);
|
|
||||
result = producer.send(message, (list, msg, arg) -> { |
|
||||
Integer queueNumber = (Integer) arg; |
|
||||
return list.get(queueNumber); |
|
||||
}, messageQueueNumber); |
|
||||
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { |
|
||||
log.error("发送有序消息失败"); |
|
||||
return result; |
|
||||
} |
|
||||
} |
|
||||
return result; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* 推送延迟消息 |
|
||||
* @param topic |
|
||||
* @param tag |
|
||||
* @param key |
|
||||
* @return boolean |
|
||||
*/ |
|
||||
public SendResult sendDelayMessage(String topic, String tag, String key, String value) |
|
||||
{ |
|
||||
SendResult result = null; |
|
||||
try |
|
||||
{ |
|
||||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|
||||
//设置消息延迟级别,我这里设置5,对应就是延时一分钟
|
|
||||
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
|
|
||||
msg.setDelayTimeLevel(4); |
|
||||
// 发送消息到一个Broker
|
|
||||
result = producer.send(msg); |
|
||||
// 通过sendResult返回消息是否成功送达
|
|
||||
log.info("发送延迟消息结果:======sendResult:{}", result); |
|
||||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
||||
log.info("发送时间:{}", format.format(new Date())); |
|
||||
return result; |
|
||||
} |
|
||||
catch (Exception e) |
|
||||
{ |
|
||||
e.printStackTrace(); |
|
||||
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); |
|
||||
} |
|
||||
return result; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
} |
|
Loading…
Reference in new issue