From 9fa884b3113c426727cb8faf90fdf60330251590 Mon Sep 17 00:00:00 2001
From: like <1025687351@qq.com>
Date: Wed, 26 Mar 2025 18:39:27 +0800
Subject: [PATCH] 1
---
dk-example/test-mq/pom.xml | 17 ++++++
.../stream/listener/RocketMQListener.java | 52 +++++++++++++++++++
.../stream/producer/RocketmqProducer.java | 49 +++++++++++++++++
3 files changed, 118 insertions(+)
create mode 100644 dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java
create mode 100644 dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java
diff --git a/dk-example/test-mq/pom.xml b/dk-example/test-mq/pom.xml
index 48bfc69..647a5eb 100644
--- a/dk-example/test-mq/pom.xml
+++ b/dk-example/test-mq/pom.xml
@@ -33,6 +33,19 @@
org.springframework.kafka
spring-kafka
+
+ org.apache.rocketmq
+ rocketmq-client
+ 4.9.0
+
+
+ junit
+ junit
+ 4.13.2
+
+
+
+
org.dromara
@@ -64,6 +77,10 @@
+
+ com.mysql
+ mysql-connector-j
+
diff --git a/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java b/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java
new file mode 100644
index 0000000..e471892
--- /dev/null
+++ b/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java
@@ -0,0 +1,52 @@
+package org.dromara.stream.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author xbhog
+ * @date 2024/06/01 17:05
+ **/
+@Slf4j
+@Component
+@RocketMQTransactionListener
+public class RocketMQListener {
+
+ public static void main(String[] args) throws Exception {
+ // 实例化消费者
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
+
+ // 设置NameServer的地址
+ consumer.setNamesrvAddr("192.168.110.96:9876");
+
+ // 订阅一个或多个Topic,以及Tag来过滤特定消息
+ consumer.subscribe("SELF_TEST_TOPIC", "Tag");
+
+ // 注册回调实现类来处理从broker拉取回来的消息
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
+ context.setAutoCommit(true); // 根据需要设置自动提交偏移量
+ for (MessageExt msg : msgs) {
+ // 处理消息内容,例如打印出来或者进行其他业务逻辑处理
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
+ }
+ return ConsumeOrderlyStatus.SUCCESS; // 返回消费状态
+ }
+ });
+
+ // 启动消费者实例
+ consumer.start();
+// System.out.printf("Consumer Started.%n");
+ }
+
+}
diff --git a/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java b/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java
new file mode 100644
index 0000000..7e794f0
--- /dev/null
+++ b/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java
@@ -0,0 +1,49 @@
+package org.dromara.stream.producer;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * @author xbhog
+ * @date 2024/05/25 17:15
+ **/
+@Slf4j
+@Component
+public class RocketmqProducer {
+
+ public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
+ // 初始化消息生产者
+ DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
+ // 设置超时时间
+ producer.setSendMsgTimeout(10000);
+ // 指定nameserver地址
+ producer.setNamesrvAddr("192.168.110.96:9876");
+
+ producer.start();
+ for (int i = 0; i < 100; i++) {
+ // 创建消息,并指定Topic,Tag和消息体
+ Message msg = new Message("SELF_TEST_TOPIC" /* Topic */, "Tag" /* Tag */, ("Hello RocketMQ " + i).getBytes());
+ // 发送消息到一个Broker
+ producer.send(msg);
+ }
+
+ // 如果不再发送消息,关闭Producer实例。
+ producer.shutdown();
+ }
+
+}
+
+
+