【RocketMQ】重试机制及死信消息处理
顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最
【RocketMQ】重试机制及死信消息处理
参考文档: 官方文档
1. 重试机制
1.1 生产者重试
rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。
当然也可以自定义重试次数及机制:
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
1.2 消费者重试
若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
- 在集群模式下,消费的业务逻辑代码会返回 Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试 16 次,之后该消息会被丢弃。
- 在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
- 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(5);
在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。
- 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);
示例:
消费者重试示例:
@Test
public void retryConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
//设置最大重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());
//业务报错了 返回null 返回RECONSUME_LATER 都会重试
if(true){
throw new RuntimeException();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic
,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:
消费类型 | 重试间隔 | 最大重试次数 |
---|---|---|
顺序消费 | 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX |
并发消费 | 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置 | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次) |
1.2.1 死信队列
上面提到的 特殊Topic
称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。
对于死信的处理方案有多种,这里演示两种:
- 编写消费者监听死信队列
- 在消费者达到最大重试次数时立刻处理。
方案一:
@Test
public void deadConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("%DLQ%retry-consumer-group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());
System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");
//业务报错了 返回null 返回RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。
方案二:
针对方案一的缺点,方案二能够比较好的解决。
@Test
public void retryConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
try {
handleDb();// 10/0
} catch (Exception e) {
if (messageExt.getReconsumeTimes() >= 2) {
//不要重试了
System.out.println("消息体:" + new String(messageExt.getBody()));
System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理");
} else {
//重试
System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
private void handleDb() {
int i = 10 / 0;
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)