Browse Source
# Conflicts: # dk-modules/sample/src/main/java/org/dromara/sample/manage/service/impl/DeviceQrtzServiceImpl.javamaster
12 changed files with 113 additions and 327 deletions
@ -1,57 +0,0 @@ |
|||
package org.dromara.stream.config; |
|||
|
|||
import org.springframework.amqp.core.Binding; |
|||
import org.springframework.amqp.core.BindingBuilder; |
|||
import org.springframework.amqp.core.Queue; |
|||
import org.springframework.amqp.core.TopicExchange; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
/** |
|||
* @author xbhog |
|||
*/ |
|||
@Configuration |
|||
public class RabbitConfig { |
|||
|
|||
public static final String EXCHANGE_NAME = "demo-exchange"; |
|||
public static final String QUEUE_NAME = "demo-queue"; |
|||
public static final String ROUTING_KEY = "demo.routing.key"; |
|||
|
|||
/** |
|||
* 创建交换机 |
|||
* ExchangeBuilder有四种交换机模式 |
|||
* Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。 |
|||
* Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。 |
|||
* Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。 |
|||
* Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。 |
|||
* durable 交换器是否持久化(false 不持久化,true 持久化) |
|||
**/ |
|||
@Bean |
|||
public TopicExchange exchange() { |
|||
return new TopicExchange(EXCHANGE_NAME); |
|||
} |
|||
|
|||
/** |
|||
* 创建队列 |
|||
* durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码 |
|||
* deliveryMode 消息是否持久化(1 不持久化,2 持久化) |
|||
**/ |
|||
@Bean |
|||
public Queue queue() { |
|||
return new Queue(QUEUE_NAME, false); |
|||
} |
|||
|
|||
/** |
|||
* 绑定交换机和队列 |
|||
* bing 方法参数可以是队列和交换机 |
|||
* to 方法参数必须是交换机 |
|||
* with 方法参数是路由Key 这里是以rabbit.开头 |
|||
* noargs 就是不要参数的意思 |
|||
* 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定 |
|||
**/ |
|||
@Bean |
|||
public Binding binding(Queue queue, TopicExchange exchange) { |
|||
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); |
|||
} |
|||
|
|||
} |
@ -1,85 +0,0 @@ |
|||
package org.dromara.stream.config; |
|||
|
|||
import org.springframework.amqp.core.*; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.util.Map; |
|||
|
|||
|
|||
/** |
|||
* RabbitTTL队列 |
|||
* |
|||
* @author xbhog |
|||
*/ |
|||
@Configuration |
|||
public class RabbitTtlQueueConfig { |
|||
|
|||
// 延迟队列名称
|
|||
public static final String DELAY_QUEUE_NAME = "delay-queue"; |
|||
// 延迟交换机名称
|
|||
public static final String DELAY_EXCHANGE_NAME = "delay-exchange"; |
|||
// 延迟路由键名称
|
|||
public static final String DELAY_ROUTING_KEY = "delay.routing.key"; |
|||
|
|||
// 死信交换机名称
|
|||
public static final String DEAD_LETTER_EXCHANGE = "dlx-exchange"; |
|||
// 死信队列名称
|
|||
public static final String DEAD_LETTER_QUEUE = "dlx-queue"; |
|||
// 死信路由键名称
|
|||
public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key"; |
|||
|
|||
/** |
|||
* 声明延迟队列 |
|||
*/ |
|||
@Bean |
|||
public Queue delayQueue() { |
|||
return QueueBuilder.durable(DELAY_QUEUE_NAME) |
|||
.deadLetterExchange(DEAD_LETTER_EXCHANGE) |
|||
.deadLetterRoutingKey(DEAD_LETTER_ROUTING_KEY) |
|||
.build(); |
|||
} |
|||
|
|||
/** |
|||
* 声明延迟交换机 |
|||
*/ |
|||
@Bean |
|||
public CustomExchange delayExchange() { |
|||
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", |
|||
true, false, Map.of("x-delayed-type", "direct")); |
|||
} |
|||
|
|||
/** |
|||
* 将延迟队列绑定到延迟交换机 |
|||
*/ |
|||
@Bean |
|||
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { |
|||
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs(); |
|||
} |
|||
|
|||
/** |
|||
* 声明死信队列 |
|||
*/ |
|||
@Bean |
|||
public Queue deadLetterQueue() { |
|||
return new Queue(DEAD_LETTER_QUEUE); |
|||
} |
|||
|
|||
/** |
|||
* 声明死信交换机 |
|||
*/ |
|||
@Bean |
|||
public DirectExchange deadLetterExchange() { |
|||
return new DirectExchange(DEAD_LETTER_EXCHANGE); |
|||
} |
|||
|
|||
/** |
|||
* 将死信队列绑定到死信交换机 |
|||
*/ |
|||
@Bean |
|||
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { |
|||
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY); |
|||
} |
|||
|
|||
} |
|||
|
@ -1,43 +0,0 @@ |
|||
package org.dromara.stream.consumer; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.stream.config.RabbitConfig; |
|||
import org.dromara.stream.config.RabbitTtlQueueConfig; |
|||
import org.springframework.amqp.core.Message; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* @author xbhog |
|||
* @date 2024年5月18日 |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class RabbitConsumer { |
|||
|
|||
/** |
|||
* 普通消息 |
|||
*/ |
|||
@RabbitListener(queues = RabbitConfig.QUEUE_NAME) |
|||
public void listenQueue(Message message) { |
|||
log.info("【消费者】Start consuming data:{}",new String(message.getBody())); |
|||
} |
|||
|
|||
/** |
|||
* 处理延迟队列消息 |
|||
*/ |
|||
@RabbitListener(queues = RabbitTtlQueueConfig.DELAY_QUEUE_NAME) |
|||
public void receiveDelayMessage(String message){ |
|||
log.info("【消费者】Received delayed message:{}",message); |
|||
} |
|||
|
|||
/** |
|||
* 处理死信队列消息 |
|||
* 当消息在延迟队列中未能被正确处理(例如因消费者逻辑错误、超时未ACK等原因) |
|||
* 它会被自动转发到死信队列中,以便后续的特殊处理或重新尝试。 |
|||
*/ |
|||
@RabbitListener(queues = RabbitTtlQueueConfig.DEAD_LETTER_QUEUE) |
|||
public void receiveDeadMessage(String message){ |
|||
log.info("【消费者】Received dead message:{}",message); |
|||
} |
|||
} |
@ -1,30 +0,0 @@ |
|||
package org.dromara.stream.producer; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.stream.config.RabbitTtlQueueConfig; |
|||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* @author xbhog |
|||
* @date 2024/05/25 17:15 |
|||
**/ |
|||
@Slf4j |
|||
@Component |
|||
public class DelayRabbitProducer { |
|||
|
|||
@Autowired |
|||
private RabbitTemplate rabbitTemplate; |
|||
|
|||
public void sendDelayMessage(String message, long delay) { |
|||
rabbitTemplate.convertAndSend( |
|||
RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, |
|||
RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message, message1 -> { |
|||
message1.getMessageProperties().setDelayLong(delay); |
|||
return message1; |
|||
}); |
|||
log.info("【生产者】Delayed message send: " + message); |
|||
} |
|||
|
|||
} |
@ -1,23 +0,0 @@ |
|||
package org.dromara.stream.producer; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.dromara.stream.config.RabbitConfig; |
|||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* @author xbhog |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
public class NormalRabbitProducer { |
|||
|
|||
@Autowired |
|||
private RabbitTemplate rabbitTemplate; |
|||
|
|||
public void send(String message) { |
|||
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message); |
|||
log.info("【生产者】Message send: " + message); |
|||
} |
|||
} |
Loading…
Reference in new issue