4 changed files with 123 additions and 2 deletions
@ -0,0 +1,52 @@ |
|||||
|
package org.dromara.stream.listener; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
||||
|
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; |
||||
|
|
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; |
||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; |
||||
|
import org.apache.rocketmq.common.message.MessageExt; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* @author xbhog |
||||
|
* @date 2024/06/01 17:05 |
||||
|
**/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
@RocketMQTransactionListener |
||||
|
public class RocketMQListener { |
||||
|
|
||||
|
public static void main(String[] args) throws Exception { |
||||
|
// 实例化消费者
|
||||
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); |
||||
|
|
||||
|
// 设置NameServer的地址
|
||||
|
consumer.setNamesrvAddr("192.168.110.96:9876"); |
||||
|
|
||||
|
// 订阅一个或多个Topic,以及Tag来过滤特定消息
|
||||
|
consumer.subscribe("SELF_TEST_TOPIC", "Tag"); |
||||
|
|
||||
|
// 注册回调实现类来处理从broker拉取回来的消息
|
||||
|
consumer.registerMessageListener(new MessageListenerOrderly() { |
||||
|
@Override |
||||
|
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { |
||||
|
context.setAutoCommit(true); // 根据需要设置自动提交偏移量
|
||||
|
for (MessageExt msg : msgs) { |
||||
|
// 处理消息内容,例如打印出来或者进行其他业务逻辑处理
|
||||
|
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); |
||||
|
} |
||||
|
return ConsumeOrderlyStatus.SUCCESS; // 返回消费状态
|
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// 启动消费者实例
|
||||
|
consumer.start(); |
||||
|
// System.out.printf("Consumer Started.%n");
|
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,49 @@ |
|||||
|
package org.dromara.stream.producer; |
||||
|
|
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.rocketmq.client.exception.MQBrokerException; |
||||
|
import org.apache.rocketmq.client.exception.MQClientException; |
||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
||||
|
|
||||
|
import org.apache.rocketmq.client.producer.SendResult; |
||||
|
import org.apache.rocketmq.common.message.Message; |
||||
|
import org.apache.rocketmq.remoting.exception.RemotingException; |
||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
import java.nio.charset.StandardCharsets; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* @author xbhog |
||||
|
* @date 2024/05/25 17:15 |
||||
|
**/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class RocketmqProducer { |
||||
|
|
||||
|
public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { |
||||
|
// 初始化消息生产者
|
||||
|
DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); |
||||
|
// 设置超时时间
|
||||
|
producer.setSendMsgTimeout(10000); |
||||
|
// 指定nameserver地址
|
||||
|
producer.setNamesrvAddr("192.168.110.96:9876"); |
||||
|
|
||||
|
producer.start(); |
||||
|
for (int i = 0; i < 100; i++) { |
||||
|
// 创建消息,并指定Topic,Tag和消息体
|
||||
|
Message msg = new Message("SELF_TEST_TOPIC" /* Topic */, "Tag" /* Tag */, ("Hello RocketMQ " + i).getBytes()); |
||||
|
// 发送消息到一个Broker
|
||||
|
producer.send(msg); |
||||
|
} |
||||
|
|
||||
|
// 如果不再发送消息,关闭Producer实例。
|
||||
|
producer.shutdown(); |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
Loading…
Reference in new issue