diff --git a/dk-example/test-mq/pom.xml b/dk-example/test-mq/pom.xml index 48bfc69..647a5eb 100644 --- a/dk-example/test-mq/pom.xml +++ b/dk-example/test-mq/pom.xml @@ -33,6 +33,19 @@ org.springframework.kafka spring-kafka + + org.apache.rocketmq + rocketmq-client + 4.9.0 + + + junit + junit + 4.13.2 + + + + org.dromara @@ -64,6 +77,10 @@ + + com.mysql + mysql-connector-j + diff --git a/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java b/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java new file mode 100644 index 0000000..e471892 --- /dev/null +++ b/dk-example/test-mq/src/main/java/org/dromara/stream/listener/RocketMQListener.java @@ -0,0 +1,52 @@ +package org.dromara.stream.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; + +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author xbhog + * @date 2024/06/01 17:05 + **/ +@Slf4j +@Component +@RocketMQTransactionListener +public class RocketMQListener { + + public static void main(String[] args) throws Exception { + // 实例化消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); + + // 设置NameServer的地址 + consumer.setNamesrvAddr("192.168.110.96:9876"); + + // 订阅一个或多个Topic,以及Tag来过滤特定消息 + consumer.subscribe("SELF_TEST_TOPIC", "Tag"); + + // 注册回调实现类来处理从broker拉取回来的消息 + consumer.registerMessageListener(new MessageListenerOrderly() { + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + context.setAutoCommit(true); // 根据需要设置自动提交偏移量 + for (MessageExt msg : msgs) { + // 处理消息内容,例如打印出来或者进行其他业务逻辑处理 + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); + } + return ConsumeOrderlyStatus.SUCCESS; // 返回消费状态 + } + }); + + // 启动消费者实例 + consumer.start(); +// System.out.printf("Consumer Started.%n"); + } + +} diff --git a/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java b/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java new file mode 100644 index 0000000..7e794f0 --- /dev/null +++ b/dk-example/test-mq/src/main/java/org/dromara/stream/producer/RocketmqProducer.java @@ -0,0 +1,49 @@ +package org.dromara.stream.producer; + + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * @author xbhog + * @date 2024/05/25 17:15 + **/ +@Slf4j +@Component +public class RocketmqProducer { + + public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { + // 初始化消息生产者 + DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); + // 设置超时时间 + producer.setSendMsgTimeout(10000); + // 指定nameserver地址 + producer.setNamesrvAddr("192.168.110.96:9876"); + + producer.start(); + for (int i = 0; i < 100; i++) { + // 创建消息,并指定Topic,Tag和消息体 + Message msg = new Message("SELF_TEST_TOPIC" /* Topic */, "Tag" /* Tag */, ("Hello RocketMQ " + i).getBytes()); + // 发送消息到一个Broker + producer.send(msg); + } + + // 如果不再发送消息,关闭Producer实例。 + producer.shutdown(); + } + +} + + + diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java index 18fe58a..61bb001 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/manage/model/entity/DeviceQrtzEntity.java @@ -4,13 +4,15 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import lombok.*; import org.dromara.common.mybatis.core.domain.BaseEntity; -import java.io.Serializable; + import java.util.ArrayList; -import java.util.Date; + import java.util.List; @@ -26,6 +28,7 @@ import java.util.List; @Builder @NoArgsConstructor @AllArgsConstructor +@JsonNaming() // 设置为驼峰命名风格 public class DeviceQrtzEntity extends BaseEntity { @TableId(type = IdType.AUTO)