From b962532431499e0a6981c8dd2f6fde92015b7311 Mon Sep 17 00:00:00 2001
From: yangwei <867012372@qq.com>
Date: Mon, 12 May 2025 10:12:50 +0800
Subject: [PATCH] =?UTF-8?q?[feat]=201=E3=80=81=E9=9B=86=E6=88=90rocketmq?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
dk-common/common-bom/pom.xml | 7 +
dk-common/common-rocketmq/pom.xml | 45 +++++
.../rocketmq/config/ProducerConfig.java | 58 ++++++
.../common/rocketmq/model/ProducerMode.java | 25 +++
.../producer/MessageProducerUtil.java | 189 ++++++++++++++++++
...ot.autoconfigure.AutoConfiguration.imports | 1 +
dk-common/pom.xml | 1 +
dk-modules/business/pom.xml | 4 +
8 files changed, 330 insertions(+)
create mode 100644 dk-common/common-rocketmq/pom.xml
create mode 100644 dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java
create mode 100644 dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java
create mode 100644 dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java
create mode 100644 dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
diff --git a/dk-common/common-bom/pom.xml b/dk-common/common-bom/pom.xml
index 7cbb10a..6324395 100644
--- a/dk-common/common-bom/pom.xml
+++ b/dk-common/common-bom/pom.xml
@@ -257,6 +257,13 @@
${revision}
+
+
+ org.dromara
+ common-rocketmq
+ ${revision}
+
+
diff --git a/dk-common/common-rocketmq/pom.xml b/dk-common/common-rocketmq/pom.xml
new file mode 100644
index 0000000..412a96a
--- /dev/null
+++ b/dk-common/common-rocketmq/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ org.dromara
+ dk-common
+ ${revision}
+
+ 4.0.0
+
+ common-rocketmq
+
+
+ common-rocketmq 配置中心
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ 4.9.0
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+
+
+
+ org.dromara
+ common-core
+
+
+
+ org.dromara
+ common-nacos
+
+
+
+
+
+
diff --git a/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java
new file mode 100644
index 0000000..4d82415
--- /dev/null
+++ b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/config/ProducerConfig.java
@@ -0,0 +1,58 @@
+package org.dromara.common.rocketmq.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.dromara.common.rocketmq.model.ProducerMode;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+
+/**
+ * mq搭建地址连接
+ * 生产者初者连接信息 具体看nacos配置
+ */
+@AutoConfiguration
+@Slf4j
+@EnableConfigurationProperties(ProducerMode.class)
+public class ProducerConfig {
+
+ /**
+ * 远程调用连接信息
+ */
+ public static DefaultMQProducer producer;
+
+ /**
+ * 连接客户端信息配置 具体看nacos配置
+ */
+ @Autowired
+ private ProducerMode producerMode;
+
+ @Bean
+ public DefaultMQProducer getRocketMQProducer() {
+ producer = new DefaultMQProducer(producerMode.getGroupName());
+ producer.setNamesrvAddr(producerMode.getNameServer());
+ //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
+ if(producerMode.getMaxMessageSize()!=null){
+ producer.setMaxMessageSize(producerMode.getMaxMessageSize());
+ }
+ if(producerMode.getSendMsgTimeout()!=null){
+ producer.setSendMsgTimeout(producerMode.getSendMsgTimeout());
+ }
+ //如果发送消息失败,设置重试次数,默认为2次
+ if(producerMode.getRetryTimesWhenSendFailed()!=null){
+ producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed());
+ }
+ producer.setVipChannelEnabled(false);
+ try {
+ producer.start();
+ log.info("生产者初始化成功:{}",producer.toString());
+ } catch (MQClientException e) {
+ log.error("生产者初始化失败:{}",e.getMessage());
+ }
+ return producer;
+ }
+
+}
diff --git a/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java
new file mode 100644
index 0000000..e255e1e
--- /dev/null
+++ b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/model/ProducerMode.java
@@ -0,0 +1,25 @@
+package org.dromara.common.rocketmq.model;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 生产者初始化
+ */
+@Data
+@ConfigurationProperties(prefix = "rocketmq.producer")
+public class ProducerMode {
+
+ private String groupName;
+
+ private String nameServer;
+
+ private Integer maxMessageSize;
+
+ private Integer sendMsgTimeout;
+
+ private Integer retryTimesWhenSendFailed;
+}
diff --git a/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java
new file mode 100644
index 0000000..14ec0fc
--- /dev/null
+++ b/dk-common/common-rocketmq/src/main/java/org/dromara/common/rocketmq/producer/MessageProducerUtil.java
@@ -0,0 +1,189 @@
+package org.dromara.common.rocketmq.producer;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.springframework.stereotype.Component;
+
+import java.io.UnsupportedEncodingException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import static org.dromara.common.rocketmq.config.ProducerConfig.producer;
+
+
+/**
+ * 消息发送
+ */
+@Slf4j
+public class MessageProducerUtil {
+
+ /**
+ * 同步发送消息
+ * @param topic 主题
+ * @param tag 标签
+ * @param key 自定义的key,根据业务来定
+ * @param value 消息的内容
+ * 通过调用 send() 方法发送消息,阻塞等待服务器响应。
+ */
+ public static SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){
+ String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】";
+ try {
+ Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
+ System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
+ SendResult result = producer.send(msg);
+ return result;
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("消息初始化失败!body:{}",body);
+
+ }
+ return null;
+ }
+
+ /**
+ * 单向发送消息
+ * @param topic 主题
+ * @param tag 标签
+ * @param key 自定义的key,根据业务来定
+ * @param value 消息的内容
+ * 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。
+ */
+ public static void sendOnewayMessage(String topic, String tag, String key, String value){
+ String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】";
+ try {
+ Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
+ System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
+ producer.sendOneway(msg);
+ } catch (UnsupportedEncodingException e) {
+ log.error("消息初始化失败!body:{}",body);
+
+ } catch (MQClientException | InterruptedException | RemotingException e) {
+ log.error("消息发送失败! body:{}",body);
+ }
+ }
+
+
+ /**
+ * 批量发送消息
+ * @param messages 消息列表
+ * 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。
+ */
+// public static SendResult sendBatchMessage(List messages){
+// String body = messages.toString();
+// try {
+// System.out.println("生产者发送消息:"+ messages);
+// // 发送批量消息
+// SendResult sendResult = producer.send(messages);
+// return sendResult;
+// } catch (MQClientException | InterruptedException | RemotingException e) {
+// log.error("消息发送失败! body:{}",body);
+// } catch (MQBrokerException e) {
+// throw new RuntimeException(e);
+// }
+// return null;
+// }
+
+
+ /**
+ * 发送有序的消息
+ * @param messagesList Message集合
+ * @param messageQueueNumber 消息队列数量,根据实际情况设定
+ * 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。
+ */
+// public static SendResult sendOrderlyMessage(List messagesList, Integer messageQueueNumber) {
+// SendResult result = null;
+// for (Message message : messagesList) {
+// try {
+// result = producer.send(message, (list, msg, arg) -> {
+// Integer queueNumber = (Integer) arg;
+// //int queueIndex = queueNumber % list.size();
+// return list.get(queueNumber);
+// }, messageQueueNumber);//根据编号取模,选择消息队列
+// } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+// log.error("发送有序消息失败");
+// return result;
+// }
+// }
+// return result;
+// }
+
+ /**
+ * 发送延迟消息
+ * @param topic 主题
+ * @param tag 标签
+ * @param key 自定义的key,根据业务来定
+ * @param value 消息的内容
+ * 延迟发送:通过设置延迟级别来实现延迟发送消息。
+ */
+ public static SendResult sendDelayMessage(String topic, String tag, String key, String value)
+ {
+ SendResult result = null;
+ try
+ {
+ Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
+ //设置消息延迟级别,我这里设置5,对应就是延时一分钟
+ // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
+ msg.setDelayTimeLevel(4);
+ // 发送消息到一个Broker
+ result = producer.send(msg);
+ // 通过sendResult返回消息是否成功送达
+ log.info("发送延迟消息结果:======sendResult:{}", result);
+ DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ log.info("发送时间:{}", format.format(new Date()));
+ return result;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result);
+ }
+ return result;
+ }
+ /**
+ * 发送异步的消息
+ * @param topic 主题
+ * @param tag 标签
+ * @param key 自定义的key,根据业务来定
+ * @param value 消息的内容
+ * 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。
+ */
+ public static SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){
+
+ try {
+ //创建一个消息实例,指定主题、标签和消息体。
+ Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
+ System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
+ producer.send(msg,new SendCallback() {
+ // 异步回调的处理
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId());
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e);
+ e.printStackTrace();
+ }
+ });
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ } catch (RemotingException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+}
diff --git a/dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000..0d19465
--- /dev/null
+++ b/dk-common/common-rocketmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.common.rocketmq.config.ProducerConfig
diff --git a/dk-common/pom.xml b/dk-common/pom.xml
index a0eb4a7..d1527b3 100644
--- a/dk-common/pom.xml
+++ b/dk-common/pom.xml
@@ -46,6 +46,7 @@
common-bus
common-sse
common-cloudsdk
+ common-rocketmq
dk-common
diff --git a/dk-modules/business/pom.xml b/dk-modules/business/pom.xml
index 78874bb..1b3151d 100644
--- a/dk-modules/business/pom.xml
+++ b/dk-modules/business/pom.xml
@@ -116,6 +116,10 @@
org.dromara
api-workflow
+
+ org.dromara
+ common-rocketmq
+