在 Spring AMQP(RabbitMQ)中,消息的 重试机制 主要有以下几种方式:
1. 使用 RetryTemplate
进行消息消费端重试
Spring AMQP 提供了 RetryTemplate
,可以在 @RabbitListener
方法内部手动实现重试逻辑。
示例:手动重试
@Component
public class MyMessageListener {
@Autowired
private RabbitTemplate rabbitTemplate;
private final RetryTemplate retryTemplate = new RetryTemplate();
@RabbitListener(queues = "myQueue")
public void listen(String message) {
retryTemplate.execute(context -> {
// 业务处理
processMessage(message);
return null;
}, context -> {
// 失败后,进入降级处理,比如存入错误交换机
rabbitTemplate.convertAndSend("erro.exchange", "erro.Key", message);
return null;
});
}
private void processMessage(String message) {
if (Math.random() > 0.5) { // 模拟失败情况
throw new RuntimeException("处理失败");
}
System.out.println("消息处理成功:" + message);
}
}
retryTemplate.execute(context -> {}
进行消息消费。如果抛出异常,则进入第二个
context -> {}
逻辑进行降级处理(例如存入错误交换机)。
2. 配置 @RabbitListener
进行自动重试
Spring AMQP 5.x 之后,可以直接使用 @RabbitListener
的 @Retryable
注解来开启消费端的重试机制。
示例:基于 @Retryable
进行自动重试
@Component
public class MyMessageListener {
@RabbitListener(queues = "myQueue")
@Retryable(
value = {RuntimeException.class}, // 仅对特定异常进行重试
maxAttempts = 3, // 最大重试次数
backoff = @Backoff(delay = 2000, multiplier = 2) // 初始延迟2秒,每次乘2倍
)
public void listen(String message) {
System.out.println("收到消息:" + message);
if (Math.random() > 0.5) { // 模拟异常
throw new RuntimeException("消息处理失败");
}
}
@Recover // 处理最终失败的情况
public void recover(RuntimeException e, String message) {
System.out.println("最终失败,存入日志:" + message);
}
}
@Retryable
:最多重试3次,每次失败后等待 2s,下一次等待时间翻倍。@Recover
:如果 3 次都失败,执行降级方法recover()
。
3. 使用 Dead Letter Exchange(DLX)
进行队列级别的消息重试
原理:如果队列中消息被拒绝(basic.nack
或 basic.reject
),RabbitMQ 可以将消息发送到 死信交换机(DLX),然后可以在 DLX 队列中进行重试。
配置步骤
声明正常队列,并绑定死信队列
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue")
.deadLetterExchange("dlx.exchange") // 绑定死信交换机
.deadLetterRoutingKey("dlx.key") // 绑定死信路由键
.ttl(5000) // 5 秒后消息进入死信队列
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dlx.queue").build();
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange");
}
@Bean
public Binding dlxBinding(Queue deadLetterQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("dlx.key");
}
2消费者手动拒绝消息
@Component
public class MyMessageListener {
@RabbitListener(queues = "normal.queue")
public void listen(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
System.out.println("消费消息:" + message);
if (Math.random() > 0.5) {
throw new RuntimeException("处理失败");
}
channel.basicAck(tag, false); // 成功确认
} catch (Exception e) {
channel.basicNack(tag, false, false); // 进入死信队列
}
}
@RabbitListener(queues = "dlx.queue")
public void handleDeadLetterQueue(String message) {
System.out.println("死信队列处理:" + message);
}
}
如果消息消费失败,将
basic.nack(tag, false, false)
,消息进入 死信队列。之后可以在 死信队列 中进行人工重试或日志记录。
4. 使用 RepublishMessageRecoverer
进行错误消息转发
如果你希望在消费失败时,自动将失败的消息转发到 错误交换机,可以使用 RepublishMessageRecoverer
。
配置 MessageRecoverer
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "erro.exchange", "erro.Key");
}
失败的消息会被重新发布到
erro.exchange
,并使用erro.Key
作为路由键。这样可以在
erro.queue
中专门处理异常消息,例如记录日志、报警等。总结
如果你的业务场景允许 立即重试,可以使用
RetryTemplate
或@Retryable
;
如果需要 延迟重试 或 人工干预,建议使用 死信队列(DLX) 进行处理。你当前的
RepublishMessageRecoverer
配置,适用于将错误消息存入专门的错误交换机,但 不会自动重试,如果要增加重试机制,建议结合 DLX 或 RetryTemplate。在 Spring AMQP(RabbitMQ) 中,消息重试机制的触发通常取决于 消费端处理失败,以下几种情况会导致消息重试:
1.
@Retryable
注解触发的消息重试如果消费者方法上使用了
@Retryable
,那么当抛出指定异常时,Spring AMQP 会自动进行重试。触发条件
消费者方法抛出指定的异常(如
RuntimeException
)配置了
@Retryable
,且未超过maxAttempts
指定的最大重试次数
示例
@RabbitListener(queues = "myQueue")
@Retryable(
value = {RuntimeException.class}, // 只有 RuntimeException 才会触发重试
maxAttempts = 3, // 最大重试 3 次
backoff = @Backoff(delay = 2000, multiplier = 2) // 初始延迟2秒,每次翻倍
)
public void listen(String message) {
System.out.println("收到消息:" + message);
throw new RuntimeException("消费失败"); // 抛出异常触发重试
}
不会触发重试的情况
如果方法执行成功,不会重试
如果抛出的异常 不在
@Retryable(value = {})
配置范围内如果超过
maxAttempts
,进入@Recover
处理(如果有
2. RetryTemplate
手动触发消息重试
如果消费者使用 RetryTemplate
,那么当业务代码抛出异常时,RetryTemplate
会控制重试逻辑。
触发条件
RetryTemplate.execute()
方法中,业务代码抛出异常maxAttempts
配置未达到上限
示例
private final RetryTemplate retryTemplate = new RetryTemplate();
@RabbitListener(queues = "myQueue")
public void listen(String message) {
retryTemplate.execute(context -> {
processMessage(message);
return null;
}, context -> {
System.out.println("消息处理失败,存入日志:" + message);
return null;
});
}
private void processMessage(String message) {
throw new RuntimeException("模拟消费失败"); // 抛出异常触发重试
}
不会触发重试的情况
业务代码执行成功
maxAttempts
配置达到上限
3. basic.nack
或 basic.reject
触发(死信队列/DLX 机制)
如果 RabbitMQ 手动确认模式(manual ack
)下,消费者调用 channel.basicNack()
或 channel.basicReject()
,可以将消息重新入队或发送到 死信队列(DLX)。
触发条件
消费者代码调用
channel.basicNack(tag, false, true)
(requeue = true
时消息会被重新投递)消费者代码调用
channel.basicReject(tag, true)
(同样会重新入队)
示例
@RabbitListener(queues = "normal.queue")
public void listen(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
System.out.println("消费消息:" + message);
throw new RuntimeException("模拟异常"); // 触发重试
} catch (Exception e) {
channel.basicNack(tag, false, true); // 重新入队
}
}
4. RepublishMessageRecoverer
触发错误消息重发
RepublishMessageRecoverer
不是重试,而是当消费失败时,将错误消息发布到 错误交换机(erro.exchange
)。
触发条件
@RabbitListener
方法执行时抛出异常配置了
MessageRecoverer
重试失败后,Spring AMQP 自动调用
RepublishMessageRecoverer
示例
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "erro.exchange", "erro.Key");
}
@RabbitListener(queues = "myQueue")
public void listen(String message) {
System.out.println("收到消息:" + message);
throw new RuntimeException("消费失败"); // 触发 RepublishMessageRecoverer
}
不会触发重试的情况
如果
@RabbitListener
方法执行成功,则不会进入RepublishMessageRecoverer
这个方式不是重试,而是消息转移
5. RabbitMQ 自带的重试机制
如果 RabbitMQ 配置了 TTL(消息存活时间)+ 死信队列(DLX),则消息会在 TTL 过期 后被重新投递到死信队列,并可在 DLX 队列中进行重试。
触发条件
队列消息 TTL 到期
死信交换机(DLX) 存在
DLX 消费者重新处理消息
示例
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal.queue")
.deadLetterExchange("dlx.exchange") // 绑定死信交换机
.deadLetterRoutingKey("dlx.key") // 绑定死信路由键
.ttl(5000) // 5 秒后进入死信队列
.build();
}
如果消息 5 秒内未被消费,它会自动进入死信队列
dlx.queue
,可以由 死信消费者 进行重试处理。
不会触发重试的情况
TTL 过期但没有 DLX 配置,消息会直接被丢弃
死信队列消费失败但没有
requeue = true
,不会再次投递总结
最佳实践
如果希望立即重试,可以使用
@Retryable
或basic.nack(requeue = true)
如果希望延迟重试,可以使用 死信队列(DLX)+ TTL
如果不想丢失失败消息,而是存入错误日志,可以使用
RepublishMessageRecoverer
如果你的 RabbitMQ 消息消费失败后没有被重试,可以检查:
是否有
@Retryable
或RetryTemplate
是否有
basicNack(tag, false, true)
(重新入队)是否配置了 死信队列(DLX)
是否使用了
RepublishMessageRecoverer
,它不会重试,而是转发错误消息