From b962532431499e0a6981c8dd2f6fde92015b7311 Mon Sep 17 00:00:00 2001 From: yangwei <867012372@qq.com> Date: Mon, 12 May 2025 10:12:50 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=201=E3=80=81=E9=9B=86=E6=88=90rocketmq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dk-common/common-bom/pom.xml | 7 + dk-common/common-rocketmq/pom.xml | 45 +++++ .../rocketmq/config/ProducerConfig.java | 58 ++++++ .../common/rocketmq/model/ProducerMode.java | 25 +++ .../producer/MessageProducerUtil.java | 189 ++++++++++++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + dk-common/pom.xml | 1 + dk-modules/business/pom.xml | 4 + 8 files changed, 330 insertions(+) create mode 100644 dk-common/common-rocketmq/pom.xml create mode 100644 dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java create mode 100644 dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java create mode 100644 dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java create mode 100644 dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/dk-common/common-bom/pom.xml b/dk-common/common-bom/pom.xml index 7cbb10a..6324395 100644 --- a/dk-common/common-bom/pom.xml +++ b/dk-common/common-bom/pom.xml @@ -257,6 +257,13 @@ ${revision} + + + org.dromara + common-rocketmq + ${revision} + + diff --git a/dk-common/common-rocketmq/pom.xml b/dk-common/common-rocketmq/pom.xml new file mode 100644 index 0000000..412a96a --- /dev/null +++ b/dk-common/common-rocketmq/pom.xml @@ -0,0 +1,45 @@ + + + + org.dromara + dk-common + ${revision} + + 4.0.0 + + common-rocketmq + + + common-rocketmq 配置中心 + + + + + + org.apache.rocketmq + rocketmq-client + 4.9.0 + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + + + + org.dromara + common-core + + + + org.dromara + common-nacos + + + + + + diff --git a/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java new file mode 100644 index 0000000..4d82415 --- /dev/null +++ b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java @@ -0,0 +1,58 @@ +package org.dromara.common.rocketmq.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.dromara.common.rocketmq.model.ProducerMode; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + + +/** + * mq搭建地址连接 + * 生产者初者连接信息 具体看nacos配置 + */ +@AutoConfiguration +@Slf4j +@EnableConfigurationProperties(ProducerMode.class) +public class ProducerConfig { + + /** + * 远程调用连接信息 + */ + public static DefaultMQProducer producer; + + /** + * 连接客户端信息配置 具体看nacos配置 + */ + @Autowired + private ProducerMode producerMode; + + @Bean + public DefaultMQProducer getRocketMQProducer() { + producer = new DefaultMQProducer(producerMode.getGroupName()); + producer.setNamesrvAddr(producerMode.getNameServer()); + //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName + if(producerMode.getMaxMessageSize()!=null){ + producer.setMaxMessageSize(producerMode.getMaxMessageSize()); + } + if(producerMode.getSendMsgTimeout()!=null){ + producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); + } + //如果发送消息失败,设置重试次数,默认为2次 + if(producerMode.getRetryTimesWhenSendFailed()!=null){ + producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); + } + producer.setVipChannelEnabled(false); + try { + producer.start(); + log.info("生产者初始化成功:{}",producer.toString()); + } catch (MQClientException e) { + log.error("生产者初始化失败:{}",e.getMessage()); + } + return producer; + } + +} diff --git a/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java new file mode 100644 index 0000000..e255e1e --- /dev/null +++ b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java @@ -0,0 +1,25 @@ +package org.dromara.common.rocketmq.model; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; + +/** + * 生产者初始化 + */ +@Data +@ConfigurationProperties(prefix = "rocketmq.producer") +public class ProducerMode { + + private String groupName; + + private String nameServer; + + private Integer maxMessageSize; + + private Integer sendMsgTimeout; + + private Integer retryTimesWhenSendFailed; +} diff --git a/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java new file mode 100644 index 0000000..14ec0fc --- /dev/null +++ b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java @@ -0,0 +1,189 @@ +package org.dromara.common.rocketmq.producer; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.stereotype.Component; + +import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static org.dromara.common.rocketmq.config.ProducerConfig.producer; + + +/** + * 消息发送 + */ +@Slf4j +public class MessageProducerUtil { + + /** + * 同步发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 通过调用 send() 方法发送消息,阻塞等待服务器响应。 + */ + public static SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + SendResult result = producer.send(msg); + return result; + } catch (Exception e) { + e.printStackTrace(); + log.error("消息初始化失败!body:{}",body); + + } + return null; + } + + /** + * 单向发送消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。 + */ + public static void sendOnewayMessage(String topic, String tag, String key, String value){ + String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; + try { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + producer.sendOneway(msg); + } catch (UnsupportedEncodingException e) { + log.error("消息初始化失败!body:{}",body); + + } catch (MQClientException | InterruptedException | RemotingException e) { + log.error("消息发送失败! body:{}",body); + } + } + + + /** + * 批量发送消息 + * @param messages 消息列表 + * 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。 + */ +// public static SendResult sendBatchMessage(List messages){ +// String body = messages.toString(); +// try { +// System.out.println("生产者发送消息:"+ messages); +// // 发送批量消息 +// SendResult sendResult = producer.send(messages); +// return sendResult; +// } catch (MQClientException | InterruptedException | RemotingException e) { +// log.error("消息发送失败! body:{}",body); +// } catch (MQBrokerException e) { +// throw new RuntimeException(e); +// } +// return null; +// } + + + /** + * 发送有序的消息 + * @param messagesList Message集合 + * @param messageQueueNumber 消息队列数量,根据实际情况设定 + * 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。 + */ +// public static SendResult sendOrderlyMessage(List messagesList, Integer messageQueueNumber) { +// SendResult result = null; +// for (Message message : messagesList) { +// try { +// result = producer.send(message, (list, msg, arg) -> { +// Integer queueNumber = (Integer) arg; +// //int queueIndex = queueNumber % list.size(); +// return list.get(queueNumber); +// }, messageQueueNumber);//根据编号取模,选择消息队列 +// } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { +// log.error("发送有序消息失败"); +// return result; +// } +// } +// return result; +// } + + /** + * 发送延迟消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 延迟发送:通过设置延迟级别来实现延迟发送消息。 + */ + public static SendResult sendDelayMessage(String topic, String tag, String key, String value) + { + SendResult result = null; + try + { + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + //设置消息延迟级别,我这里设置5,对应就是延时一分钟 + // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" + msg.setDelayTimeLevel(4); + // 发送消息到一个Broker + result = producer.send(msg); + // 通过sendResult返回消息是否成功送达 + log.info("发送延迟消息结果:======sendResult:{}", result); + DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + log.info("发送时间:{}", format.format(new Date())); + return result; + } + catch (Exception e) + { + e.printStackTrace(); + log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); + } + return result; + } + /** + * 发送异步的消息 + * @param topic 主题 + * @param tag 标签 + * @param key 自定义的key,根据业务来定 + * @param value 消息的内容 + * 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。 + */ + public static SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){ + + try { + //创建一个消息实例,指定主题、标签和消息体。 + Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); + System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); + producer.send(msg,new SendCallback() { + // 异步回调的处理 + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId()); + } + @Override + public void onException(Throwable e) { + System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e); + e.printStackTrace(); + } + }); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return null; + } + +} diff --git a/dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..0d19465 --- /dev/null +++ b/dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.dromara.common.rocketmq.config.ProducerConfig diff --git a/dk-common/pom.xml b/dk-common/pom.xml index a0eb4a7..d1527b3 100644 --- a/dk-common/pom.xml +++ b/dk-common/pom.xml @@ -46,6 +46,7 @@ common-bus common-sse common-cloudsdk + common-rocketmq dk-common diff --git a/dk-modules/business/pom.xml b/dk-modules/business/pom.xml index 78874bb..1b3151d 100644 --- a/dk-modules/business/pom.xml +++ b/dk-modules/business/pom.xml @@ -116,6 +116,10 @@ org.dromara api-workflow + + org.dromara + common-rocketmq +