rocketmq-使用
导入依赖
<!--需要和rocketmq版本相同-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
普通消息
同步发送
同步发送是指,producer发出一条消息后,会在收到mq返回的ack之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低
//创建一个producer,参数为生产者组名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
//指定nameServer地址
producer.setNamesrvAddr("localhost:9876");
//发送失败时重试次数,默认2
producer.setRetryTimesWhenSendFailed(3);
//设置发送超时时限,默认3s
producer.setSendMsgTimeout(5000);
//开启生产者
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("hi," + i).getBytes();
//生产消息
Message message = new Message("someTopic", "someTag", body);
message.setKeys("key-"+i);//为消息指定key
//发送消息
SendResult result = producer.send(message);
System.out.println(result);
}
//关闭producer
producer.shutdown();
消息发送的状态:
public enum SendStatus { SEND_OK, //发送成功 FLUSH_DISK_TIMEOUT,//刷盘超时(当broker设置的刷盘策略为同步刷盘时,才能出现此异常) FLUSH_SLAVE_TIMEOUT,//slave同步超时(当broker集群设置的master-slave的复制方式为同步复制时,才能出现此异常) SLAVE_NOT_AVAILABLE,//没有可用的slave(当broker集群设置为master-slave的复制方式为同步复制时,才能出现此异常) }
异步发送
异步发送是指,producer发出消息后无需等待mq返回ack,直接发送下一条消息。该方式的消息可靠性得到了保证,发送效率也可以
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
//异步发送失败后不进行重试
producer.setRetryTimesWhenSendAsyncFailed(0);
//指定新创建的topic的queue数量为2,默认4
producer.setDefaultTopicQueueNums(2);
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("hi," + i).getBytes();
Message message = new Message("myTopicA","myTag",body);
//异步发送,并指定回调
producer.send(message, new SendCallback() {
//当producer接收到mq发送来的ack后就会执行该方法
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
//因为是异步发送,所以需要等一会再关闭
TimeUnit.SECONDS.sleep(30);
producer.shutdown();
单向发送
单向发送是指,producer仅负责发送消息,不等待、不处理mq的ack。该发送方式时mq也不返回ack。该方式的消息发送效率最高,但消息可靠性较差
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("hi," + i).getBytes();
Message message = new Message("single", "someTag", body);
//单向发送
producer.sendOneway(message);
}
producer.shutdown();
消费消息
//定义一个pull消费者
//DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
//定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
//指定nameServer
consumer.setNamesrvAddr("localhost:9876");
//指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//指定消费的topic和tag
consumer.subscribe("someTopic","*");
//指定采用广播模式消费,默认CLUSTERING集群模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//注册一个消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 一旦broker中有了其订阅的消息就会触发该方法的执行
* @param msgs 消息
* @param context
* @return 当前consumer消费的状态
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
//返回状态消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//开启消费者
consumer.start();
顺序消息
顺序消息是指,严格按照消息的发送顺序进行消费的消息(fifo)
默认情况下,生产者会把消息以round robin轮询的方式发送到不同的queue分区队列;而消费消息时会从多个queue上拉取消息,这种情况下发送和消费不能保证顺序。但如果将消息仅发送到同一个queue中,消费时也只从这个queue上拉取消息,就严格保证了消息的顺序性
根据有序范围的不同,顺序消息可以分为:
- 分区有序:有多个queue参与,仅可保证该queue上的消息顺序
- 全局有序:发送和消费参与的queue只有一个时,保证的有序是整个topic的
创建topic时指定queue的数量:
- 在代码中创建producer时,指定其创建queue的数量
- rocketmq控制台中手动指定queue的数量
- 使用mqadmin命令手动创建topic时指定queue的数量
如何实现queue选择器:
- 在定义producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的
- 在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论是什么,都是唯一的
- 一般的选择算法是,让选择key(或hash值)与该topic的queue的数量取模,其结果为queueId
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
//如果是全局有序,则需要设置为1
//producer.setDefaultTopicQueueNums(1);
producer.start();
for (int i = 0; i < 10; i++) {
Integer orderId = i;
byte[] body = ("hi,"+i).getBytes();
Message message = new Message("topicA","tagA",body);
//将orderid设置为key
message.setKeys(orderId.toString());
//发送消息 MessageQueueSelector选择器
producer.send(message, new MessageQueueSelector() {
/**
* 具体的选择算法在该方法中定义
* @param mqs queue列表
* @param msg 当前消息
* @param arg send()的第三个参数 (这里就是send方法中指定的orderId)
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//使用消息key作为选择key
/*String keys = msg.getKeys();
Integer id = Integer.valueOf(keys);*/
//使用arg作为选择key
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},orderId);
}
producer.shutdown();
延时消息
当消息写入到broker后,在指定时长后才被消费处理的消息,称为延时消息
延时等级
延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在rocketmq服务端的MessageStoreConfig类中的messageDelayLevel变量中
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
即,若指定延时等级为3,则表示延迟时长为10s
如果需要自定义延时等级,可以通过在broker加载的配置文件中新增配置messageDelayLevel=xxx
(配置文件在rocketmq安装目录下的config目录中)
原理
producer将消息发送到broker后,broker会首先先将消息写入到commitlog文件,然后需要将其分发到相应的consumerqueue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有,则需要经历一个复杂的过程:
-
修改消息的topic为SCHEDULE_TOPIC_XXXX
-
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)
延迟等级delayLevel与queueId的对应关系为:queueId = delayLevel - 1(queueId从0开始,delayLevel从1开始)
创建queueId时,并不是一次性将所有等级的目录全部创建,而是用到哪个创建哪个
-
修改消息索引单元内容。索引单元中的message tag hashcode部分原本存放的是消息的tag的hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原topic后再次被写入到commitlog中的时间。投递时间=消息存储时间+延迟等级时间。消息存储时间指的是消息被发送到broker时的时间戳
-
将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
-
broker内部有一个延迟消息服务类,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到目标topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延时的普通消息。然后再次将消息投递到目标topic中
ScheuleMessageService在broker启动时,会创建一个定时器Timer,用于执行相应的定时任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消费与投递。每个TimerTask都会检测相应queue队列的第一条消息是否到期
实例
生产者:
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("hi,"+i).getBytes();
Message message = new Message("topicB", "someTag", body);
//指定消息延迟等级3(10秒)
message.setDelayTimeLevel(3);
SendResult result = producer.send(message);
//输出消息被发送的时间
System.out.println(new SimpleDateFormat("mm:ss")
.format(new Date())+","+result);
}
producer.shutdown();
消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("topicB","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new SimpleDateFormat("mm:ss")
.format(new Date())+","+msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
事务消息
rocketmq提供了类似X/Open XA
的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式处理模式
半事务消息
暂不能投递的消息,发送方已经成功的将消息发送到broker,但是broker未收到最终确认指令,此时该消息被标记成暂不能投递
状态,即不能被消费者看到。处于该种状态下的消息即半事务消息
本地事务状态
producer回调操作执行的结果为本地事务状态,其会发送给tc,而tc会再发送给tm。tm会根据tc发送来的本地事务状态来决定全局事务状态
消息回查
消息回查,即重新查询本地事务的执行状态(消息回查,不是重新执行回调操作。如:回调操作是执行预扣款,回查则是查看预扣款的执行结果)
引发回查的原因:
- 回调操作返回UNKNWON
- tc没有收到tm的最终全局事务确认
XA模式
角色:
- tc:事务协调者,维护全局和分支事务的状态,驱动全局事务提交和回滚(rocketmq中的broker充当tc)
- tm:事务管理器,定义全局事务的范围,开始全局事务,提交或回滚全局事务。他是全局事务的发起者(producer充当tm)
- rm:资源管理器,管理分支事务的资源,与tc进行通信注册分支事务并报告分支事务的状态,驱动分支事务的提交和回滚(producer和broker均是rm)
流程:
- tm向tc发起指令,开启一个全局事务
- 根据业务要求,各个rm会逐个向tc注册分支事务,然后tc会逐个向rm发出预执行指令
- 各个rm在接收到指令后会在本地事务预执行
- rm将预执行结果发送给tc(可能是成功,也可能是失败)
- tc在接收到各个rm的结果后,会进行汇总并上报给tm,tm根据总汇结果向tc发送确认指令
- 若所有结果都成功,则向tc发送Global Commit指令
- 只要有结果是失败响应,则向tc发送Global Rollback指令
- tc在接收到指令后再次向rm发送确认指令
事务消息方案并不是一个典型的xa模式。因为xa模式中的分支事务是异步的,而事务消息方案中是同步的
注意:
- 事务消息不支持延时消息
- 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交)
实例
生产者:
TransactionMQProducer producer = new TransactionMQProducer("tpg");
producer.setNamesrvAddr("localhost:9876");
//定义一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//为生产者指定一个线程池
producer.setExecutorService(executor);
//为生产者添加事务监听器
producer.setTransactionListener(new ICBCTransactionListener());
producer.start();
String[] tags = {"taga","tagb","tagc"};
for (int i = 0; i < 3; i++) {
byte[] body = ("hi,"+i).getBytes();
Message message = new Message("TTopic", tags[i], body);
//发送事务消息(第二个参数用于指定在执行本地事务时要使用的自定义业务参数)
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println("发送结果为:"+sendResult.getSendStatus());
}
监听器:
public class ICBCTransactionListener implements TransactionListener {
/**
* 回调操作
* 消息预提交成功就会触发该方法的执行,用于完成本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("预提交消息成功:"+msg);
if ("taga".equalsIgnoreCase(msg.getTags())){
return LocalTransactionState.COMMIT_MESSAGE;//成功,可以被消费者消费
}else if ("tagb".equalsIgnoreCase(msg.getTags())){
return LocalTransactionState.ROLLBACK_MESSAGE;//失败,不可被消费者消费
}else if ("tagc".equalsIgnoreCase(msg.getTags())){
return LocalTransactionState.UNKNOW;//未知,需要执行消息回查
}
return LocalTransactionState.UNKNOW;
}
/**
* 消息回查
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("执行消息回查"+msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;//回查成功,可以被消费者消费
}
}
批量消息
批量发送
批量发送限制
- 批量发送的消息必须具有相同的topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
批量发送大小
默认情况下,一批发送的消息总大小不能超过4mb字节。如果超出该值则:
- 将消息进行拆分
- 在producer端和broker端修改属性
maxMessageSize
注意:生产者通过send()方法发送Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:topic、消息body、消息日志(20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、要发送的queueid等。最终写入到broker中的消息单元中的数据都是来自于这些属性
实例
生产者:
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
//指定要发送的消息的最大大小,默认是4m(仅修改此属性是不行的,同时还需要修改broker加载的配置文件)
producer.setMaxMessageSize(4*1024*1024);
producer.start();
//定义要发送的消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
byte[] body = ("hi,"+i).getBytes();
Message msg = new Message("someTopic","someTag",body);
messages.add(msg);
}
//消息列表分割器,将消息分割为多个小列表
MessageListSplitter splitter = new MessageListSplitter(messages);
while (splitter.hasNext()){
List<Message> list = splitter.next();
producer.send(list);
}
producer.shutdown();
分割器:
public class MessageListSplitter implements Iterator<List<Message>> {
//极限值
private final int SIZE_LIMIT = 4 * 1024 * 1024;
//所有消息
private final List<Message> messages;
//批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(List<Message> messages){
this.messages = messages;
}
@Override
public boolean hasNext() {
//判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
//当前要发送的这一批消息列表的大小
int totalSize = 0;
for (; nextIndex<messages.size(); nextIndex++){
Message message = messages.get(nextIndex);
//topic 和 body的长度
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
//属性的长度
tmpSize += entry.getKey().length() + entry.getValue().length();
}
//log的长度
tmpSize += 20;
//单个数据大于极限
if (tmpSize > SIZE_LIMIT){
if (nextIndex - currIndex == 0){
nextIndex ++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT){
break;
}else {
totalSize += tmpSize;
}
}
List<Message> subList = this.messages.subList(currIndex, nextIndex);
//下次的开始索引
currIndex = nextIndex;
return subList;
}
}
批量消费
consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize
属性来指定。不过,该值不能超过32。因为默认情况下每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize
属性来指定
consumeMessageBatchMaxSize设置的越大,consumer每次拉取的时间就越长
pullBatchSize:设置的越大,consumer的并发消费能力就越低(每一批只会用一个线程消费)
实例
消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("someTopic","*");
//指定每次可以消费10条消息,默认1
consumer.setConsumeMessageBatchMaxSize(10);
//指定每次可以从broker拉取40条消息,默认32
consumer.setPullBatchSize(40);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//消费成功
//return ConsumeConcurrentlyStatus.RECONSUME_LATER;//消费异常
}
});
consumer.start();
消息过滤
消费者在进行消息订阅时,除了可以指定要订阅的topic外,还可以指定条件进行过滤
tag过滤
通过consumer的subscribe()方法指定要订阅的tag。如果订阅多个tag的消息,tag间使用||
符号连接
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅mytopic中 tag为 taga和tagc的消息
consumer.subscribe("mytopic","taga || tagb");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
sql过滤
sql过滤是一种通过表达式对事先埋入到消息中的用户属性
进行筛选过滤的方式。通过sql过滤,可以实现对消息的复杂过滤。不过,只有使用push模式
的消费者才能使用sql过滤
支持的常量类型:
- 数值:比如:123,3.14
- 字符:必须用单引号括起来,比如:‘abc’
- 布尔:true或false
- null:特殊常量,表示空
支持的运算符:
- 数值比较:>,>=,<,<=,=,between
- 字符比较:=,<>,in
- 逻辑运算:and,or,not
- null判断:is null 或 is not null
注意:
The broker does not support consumer to filter message by SQL92
默认情况下broker没有开启消息的sql过滤功能,需要在broker加载的配置文件中添加属性开启该功能
enablePropertyFilter = true
实例
生产者:
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("hi,"+i).getBytes();
Message message = new Message("mytopic","mytag",body);
//埋入用户属性age
message.putUserProperty("age",i+"");
SendResult result = producer.send(message);
System.out.println(result);
}
producer.shutdown();
消费者:
//需要push模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//从mytopic的消息中过滤出 age 在0-6之间的消息
consumer.subscribe("mytopic", MessageSelector.bySql("age between 0 and 6"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
消息发送重试
producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制
注意:
- 生产者在发送消息时,若采用同步或异步方式发送,发送失败会重试,但oneway消息发送方式是没有重试机制的
- 只有普通消息具有发送重试机制,顺序消息是没有的
- 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在rocketmq中是无法避免的
消息重试由三种策略:
-
同步发送失败:
对于普通消息,消息发送默认采用round-robin策略来选择要发送到的队列。如果发送失败,默认重试2次。但在重试时是不会选择上次发送失败的broker,而是其他broker。若只有一个broker,就只能发送给这个broker,但会尽量发送到其他队列中
broker还具有
失败隔离
功能,使producer尽量选择未发生过异常的broker作为目标如果超过重试次数,则抛出异常,由producer去保证消息不丢失
代码中通过
producer.setRetryTimesWhenSendFailed(3);
设置重试次数 -
异步发送失败
异步发送失败时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢失
代码中通过
producer.setRetryTimesWhenSendAsyncFailed(0);
设置重试次数 -
消息刷盘失败
消息刷盘超时(master或slave)或slave不可用(返回状态非SEND_OK)时,默认是不会将消息尝试发送到其他的broker的。不过,对于重要消息可以通过在broker的配置文件中设置
retryAnotherBrokerWhenNotStoreOK=true
来开启
消息消费重试
顺序消息的消费重试
对于顺序消息,当consumer消费消息失败后,为了保证消息的顺序性,其会自动不断的进行消息重试,直到消费成功。重试期间会出现消息消费被阻塞的情况
通过代码consumer.setSuspendCurrentQueueTimeMillis(1000);
设置重试消费间隔,默认1000毫秒,取值范围为10到30000毫秒
注意:顺序消息没有发送失败重试,但具有消费重试
无序消息的消费重试
对于无序消息(普通消息、延时消息、事务消息),当consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群方式
生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息
消费重试次数与间隔
对于无序消息
集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的时间间隔是不同的,会逐渐变长(有序间隔固定)。若消息经过最多重试次数后仍然失败,则消息进入死信队列
通过代码consumer.setMaxReconsumeTimes(16);
设置重试次数(默认16)。若重试次数大于16,重试间的时间间隔均为2小时
对于消费者组,若有一个消费者设置了消费重试次数,则整个组会应用该设置。如果有多个设置了该值,后面的会覆盖前面的设置
重试队列
对于需要重试的消息,并不是consumer在等待了指定的时间后再次去原来的地方进行消费,而是将这些需要重试消费的消息放入到了一个特殊的topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列
当出现需要重试消费的消息时
,broker会为每个消费组
都设置一个topic名称为%RETRY%consumerGroup@consumerGroup
的重试队列
broker对于重试消息的处理是通过延时消息
实现的。先将消息保存到SCHEDULE_TOPIC_XXXX
延时队列中,延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup
重试队列中
消费重试时间间隔和延时消息的延时等级十分相似,除了没有延时等级的前两个等级外(直接使用第三个开始),其他是相同的
消费重试配置方式
集群模式下,消息消费失败后若希望消费重试,则需要在监听器接口实现中明确进行如下方式之一的配置:
- 返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
- 返回null
- 抛出异常
消息不重试配置方式
集群模式下,消息消费失败后若不希望重试,则在捕获异常后返回与成功相同的结果,即:
- 返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
死信队列
当一条消息消费失败,消息队列会自动进行重试;当达到最大重试次数后,则将其发送到一个特殊队列中。这个队列就是死信队列
特征
- 死信队列中的消息不会再被消费者正常消费(死信队列对消费者是不可见的)
- 死信存储有效期和正常消息相同,均为3天,3天后会被自动删除(commitlog文件的过期时间)
- 死信队列就是一个特殊的topic,名为
%DLQ%consumerGroup@consumerGroup
- 如果一个消费者组未产生死信消息,则不会为其创建相应的死信队列
死信消息的处理
当一条消息进入死信队列,就意味着系统中某个地方出现了问题。因此对于死信消息,通常需要开发人员进行特殊处理,解决代码中的bug。再将原来的死信消息再次进行投递消费