rocketmq-笔记
基本概念
消息(message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题
主题(topic)
topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是rocketmq进行消息订阅的单位
一个生产者可以同时发送多种topic的消息;而一个消费者只能对某种特定的topic感兴趣,即只可以订阅和消费一种topic消息
标签(tag)
为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效的保持代码的清晰度和连贯性,并优化rocketMQ提供的查询系统。消费者可以根据tag实现对不同子主题的不同消费逻辑,实现更好的扩展性
队列(queue)
存储消息的物理实体。一个topic中可以包含多个queue,每个queue中存放的就是该topic的消息。一个topic的queue也被称为一个topic中的消息分区
类似于kafka中的分区partition
消息标识(messageId/key)
rocketmq中每个消息拥有唯一的messageId,且可以携带具有业务标识的key,以方便对消息的查询。不过需要注意的是,messageId有两个:在生产者send()消息时会自动生成一个messageId(msgId),当消息到达broker后,broker也会自动生成一个messageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识
-
msgId:由producer端生成,其生成规则为:
producerIp+进程pid+MessageClientIDSetter类的classLoader的hashCode+当前时间+AutomicInteger自增计数器
-
offsetMsgId:由broker端生成,其生成规则为:
brokerIp+物理分区的offset(queue中的偏移量)
-
key:由用户指定的业务相关的唯一标识
系统架构
producer
消息生产者,负责生产消息。producer通过mq的负载均衡模块选择相应的broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟
rocketmq中的消息生产者都是以生产者组(producer group)的形式出现的。生产者组是同一类生产者的集合,一个生产者组可以同时发送多个topic的消息
consumer
消息消费者,负责消费消息。一个消费者会从broker服务器中获取到消息,并对消息进行相关业务处理
rockermq中的消费者都是以消费者组(consumer group)的形式出现的。消费者组是同一类消费者的集合,这类consumer消费的是同一个topic的消息。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易
消费者组中的consumer的数量应该小于等于订阅的topic中的queue的数量。如果超出queue数量,则多出的consumer不能消费消息
一个topic的消息可以被多个消费者组同时消费
nameServer
NameServer是一个broker与topic路由的注册中心,支持broker的动态注册与发现
主要功能为:
- broker管理:接受broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检测broker是否存活
- 路由信息管理:每个NameServer中都保存着broker集群的整个路由信息和用于客户端查询的队列信息。producer和consumer通过NameServer可以获取整个比如broker集群的路由信息,从而进行消息的投递和消费
rocketmq 1.0和2.0版本中,依赖的仍是zookeeper。从3.0开始去掉了zookeeper的依赖,使用了自己的NameServer
broker
broker充当着消息中转角色,负责存储消息、转发消息。broker在rocketmq系统中负责接收并存储生产者发送过来的消息,同时为消费者的拉取请求作准备。broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移量offset、主题、队列等
NameServer
路由注册
NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通信。那各节点中的数据是如何进行数据同步的呢?在broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在每个NameServer内部维护者一个broker列表,用来动态存储broker的信息
这种NameServer的无状态方式
- 优点:集群搭建简单
- 缺点:对于broker,必须明确指出所有NameServer地址,否则未指出的将不会注册,也因为如此,NameServer并不能随便扩容。如果不重新配置broker,则新增的NameServer对broker来说是不可见的
broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个broker的最新存活时间
路由剔除
由于broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除
NameServer中有一个定时任务,每隔10秒就会扫描一次Broker列表,查看每一个broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定broker失败,然后将其从broker列表中剔除
路由发现
rocketmq的路由发现采用的是pull模型。当topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取topic最新的路由。默认客户端每30秒会拉取一次最新的路由
push模型:推送模型。实时性好,但需要维护一个长连接,消耗资源较大
pull模型:拉取模型。存在的问题是,实时性差
long polling模型:长轮询模型。其是对push和pull模型的整合,充分利用了这两种模型的优势
客户端NameServer选择策略
客户端(producer和consumer)在配置时必须要写上NameServer集群的地址,那么客户端到底连接的是哪个NameServer节点呢?客户端首先会生成一个随机数,然后再与NameServer节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用轮询策略,逐个尝试着去连接其它节点
工作流程
- 启动NameServer,NameServer启动后开始监听端口,等待broker、producer、consumer连接
- 启动broker时,broker会与所有NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳
- 发送消息前,可以先创建topic,创建topic时需要指定该topic要存储在哪些broker上,当然,在创建topic时也会将topic与broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建topic
- producer发送消息,启动时先根NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的topic的queue与broker的地址(ip+port)的映射关系。然后根据算法策略从中选择一个queue,与队列所在的broker建立长连接从而向broker发送消息。当然,在获取到路由信息后,producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息
- consumer跟producer类似,跟其中一台NameServer建立长连接,获取其所订阅topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的queue,然后直接跟broker建立长连接,开始消费其中的消息。consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于producer的是,consumer还会向broker发送心跳,以确保broker的存活状态
手动topic的创建模式:
- 集群模式:该模式下创建的topic在集群中,所有broker中的queue数量是相同的
- broker模式:该模式下创建的topic在集群中,每个broker的queue数量可以不同
自动创建topic时,默认采用的是broker模式,会为每个broker默认创建4个queue
读写队列
从物理上来讲读写队列是同一个队列。所以,不存在读写队列数据同步的问题。读写队列是逻辑上进行区分的概念。一般情况下,读写队列数量是相同的
如果创建topic时的写队列数量为8,读队列为4,此时系统会创建8个queue,分别为0-7。producer会将消息写入到这8个队列,但consumer只会消费0-3这4个queue中的消息,4-7中的消息是不会被消费到的
如果创建topic时的写队列数量为4,读队列为8,此时系统会创建8个queue,分别为0-7。producer会将消息写入到0-3这4个队列,但consumer只会消费0-7这8个queue中的消息,但是4-7中是没有消息的。此时假设consumer group中包含两个consumer,consumer1消费0-3,consumer2消费4-7。consumer2是没有消息可消费的
当读写队列数量不同时,总是有问题的,那么为什么这样设计?这样设计是为了方便topic的缩容。例如,原来topic中有16个queue,如何能使其queue缩容为8,还不会丢失消息?可以动态修改写队列数量为8,读队列数量不变。此时新的消息只能写入前8个队列,而消费的却是16个队列中的数据。当发现后8个队列中的消息消费完毕后,就可以再将读队列数量动态设置为8.整个缩容过程没有丢失任何消息
perm
用于设置当前创建topic的操作权限:
- 2:只写
- 4:只读
- 6:读写
复制策略
复制策略是broker的master与slave间的数据同步方式。分为同步复制和异步复制:
- 同步复制:消息写入master后,master会等待slave同步数据成功后才向producer返回成功ack
- 异步复制:消息写入master后,master立即向producer返回成功ack,无需等待slave同步成功
刷盘策略
刷盘策略指的是broker中消息的落盘方式,即消息发送到broker内存后消息持久化到磁盘的方式。分为同步刷盘和异步刷盘:
- 同步刷盘:当消息持久化到broker的磁盘后才算是消息写入成功
- 异步刷盘:当消息写入到broker的内存后即表示消息写入成功,无需等待消息持久化到磁盘
消息写入到broker的内存,一般是写入到了pageCache
对于异步刷盘策略,消息会写入到pageCache后立即返回成功ack。但并不会立即做落盘操作,而是当pageCache到达一定量时会自动进行落盘
broker集群模式
单master
只有一个broker。这种方式也只能是在测试时使用,生成环境下不能使用,因为存在单点问题
多master
broker集群仅由多个master构成,不存在slave。同一topic的各个queue会平均分布在各个master节点上
-
优点:配置简单,单个master宕机或重启维护对应用无影响,在磁盘配置为raid10时,即使机器宕机不可恢复情况下,由于raid10磁盘非常可靠,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
如果没有配置raid磁盘阵列,一旦出现master宕机,则会发送大量消息丢失的情况
-
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复前不可订阅(不可消费)消息实时性会受到影响
多master多slave(异步复制)
broker集群由多个master构成,每个master又配置了多个slave(在配置了raid磁盘阵列的情况下,一个master一般配置一个slave即可)。master与slave的关系是主备关系,即master负责处理消息的读写请求,而slave仅负责消息的备份和master宕机后的角色切换
异步复制策略,即消息写入master成功后,立即返回ack,无需等待slave同步成功。该模式下当master宕机后,可能会存在少量消息丢失的问题
多master多slave(同步双写)
该模式是多个master多slave的同步复制实现。所谓同步双写,指的是消息写入master成功后,master会等待slave同步数据成功后才返回ack,即master与slave都要写入成功才会返回ack,也即双写
该模式与异步复制模式比,优点是消息更安全,不存在丢失消息的情况。但单个消息的rt略高,从而导致性能略低(大约10%)
消息的生产
producer可以将消息写入到某broker中的某queue中,其过程如下:
- producer发送消息之前,会先向NameServer发出获取topic路由信息的请求
- NameServer返回该topic的路由表和broker列表
- producer根据代码中指定的queue选择策略,从queue列表中选出一个queue,用于后续存储消息
- producer对消息做一些特殊处理,例如,消息本身超过4m,则会进行压缩
- producer向选择的queue所在的broker发出rpc请求,将消息发送到queue
路由表:实际上是一个map,key为topic名称,value为一个QueueData实例列表
QueueData:一个topic在一个broker中的所有queue
broker列表:也是一个map。key为brokerName,value为BrokerData
BrokerData:一组brokerName相同的小集群(master-slave)。包含brokerName和一个map。map的key为brokerId,value为该broker的地址(brokerId为0为master,非0为slave)
queue选择算法
对于无序消息,其queue选择算法,也称为消息投递算法,常见的有两种:
- 轮询算法:默认算法,该算法保证了每个queue可以均匀的获取到消息
- 最小投递延迟算法:该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的queue。如果延迟相同,则采用轮询算法投递
消息的存储
rocketmq中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下store
目录中
- abort:该文件在broker启动后会自动创建,正常关闭broker,该文件会自动消失。若在没有启动broker的情况下,发现这个文件是存在的,则说明之前broker的关闭是非正常的
- checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
- commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
- consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
- index:其中存放着消息索引文件indexFile
- config:存放着broker运行期间的一些配置数据
- lock:运行期间使用到的全局资源锁
commitlog
commitlog目录中存放着很多的mappedFile文件,当前broker中的所有消息都是落盘到这些mappedFile文件中的。mappedFile文件最大为1g,文件名由20位十进制数构成,表示当前文件的第一条消息的起始位偏移量
注意:一个broker中仅包含一个commitlog目录,所有的mappedFile文件都是存放在该目录中的。即无论当前broker中存放着多少topic的消息,这些消息都是被顺序写入到了mappedFile文件中的。也就是说,这些消息在broker中存放时没有按照topic进行分类存放
消息单元
mappedFile文件内容由一个个的消息单元构成。每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在queue中存储的偏移量QueueOffset等近20余项相关属性
consumequeue
为了提高效率,会为每个topic在/store/consumequeue
中创建一个目录,目录名称为topic名称,在该目录下,会再为每个queue创建一个目录,目录名为queueId。每个目录中存放着若干consumequeue文件,consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息
consumequeue文件名也是由20位数字构成,表示当前文件的第一个索引条目的起始位偏移量。与mappedFile文件名不同的是,其后续文件名是固定的。因为consumequeue文件大小是固定不变的
索引条目
每个consumequeue文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性:
- 消息在mappedFile文件中的偏移量CommitLog Offset,8字节
- 消息长度,4字节
- 消息Tag的hashcode值,8字节
这三个属性占20个字节,所以每个文件的大小固定是30w*20字节
对文件的读写
消息写入:
- broker根据生产者发送过来的queueId,获取到该消息对应的索引条目要在consumequeue中的写入偏移量,即QueueOffset
- 将queueId、queueOffset等数据,与消息一起封装为消息单元
- 将消息单元写入到commitlog。同时,形成消息索引条目
- 将消息索引条目分发到相应的consumequeue
消息拉取:
- consumer获取到其要消费消息所在queue的
消费offset
(消费进度),计算出其要消费消息的消息offset
(消费offset+1) - consumer向broker发送拉取请求,其中会包含其要拉取消息的queue、消息offset及消息tag
- broker计算在该consumequeue中的queueOffset(queueOffset=消息offset*20字节)
- 从该queueOffset处开始向后查找第一个指定tag的索引目录
- 解析该索引目录的前8个字节,即可定位到该消息在commitlog中的commitlog offset
- 从对应commitlog offset中读取消息单元,并发送给consumer
性能优化
- rocketmq对文件的读写操作是通过mmap零拷贝进行的,将对文件的操作转化为直接对内存地址进行操作,从而极大的提高了文件的读写效率
- consumequeue中的数据是顺序存放的,还引入了PageCache预读取机制,使得对consumequeue文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能
PageCache机制:
- 写操作:os会先将数据写入到PageCache中,随后会以异步的方式由pdflush内核线程将cache中的数据刷盘到物理磁盘
- 读数据:先从PageCache中读取数据,若没有命中,则os再从物理磁盘上加载数据到PageCache,同时也会对相邻数据块中的数据进行预读取
rockermq中可能会影响性能的是对commitlog的读取。因为对commitlog来说,读取消息时会产生大量的随机访问
indexFile
rocketmq提供了根据key进行消息查询的功能。该查询是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索引数据是包含了key的消息被发送到了broker时写入的,如果消息中没有包含key,则不会写入
indexFile的创建时机:
- 当一条条带key的消息发送过来后,发现没有indexFile,此时会创建
- 当一个indexFile中挂载的index索引单元数量超出2000w个时,会创建新的indexFile(根据indexHeader中的indexCount)
indexFile结构
每个broker中会包含一组indexFile,每个indexFile都是以一个文件创建的时间戳命名的。每个indexFile文件由三部分构成:indexHeader、slots槽位、indexes索引单元。每个indexFile文件中包含500w个slot槽。而每个slot槽又可能会挂载很多的index索引单元
一个indexFile的最大大小为:40+500w*4+2000w*20个字节
indexFile的文件名为当前被创建的时间戳,根据业务key进行查询时,查询条件除了key之外,还需要额外指定一个要查询的时间戳,表示要查询不大于该时间戳的最新消息,这个时间戳文件名可以简化查询,提高效率
indexHeader
indexHeader固定40个字节
- beginTimestamp:该indexFile中第一条消息的存储时间
- endTimestamp:该indexFile中最后一条消息存储时间
- beginPhyoffset:该indexFile中第一条消息在commitlog中的偏移量commitlog offset
- endPhyoffset:该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
- hashSlorCount:已经填充有index的slot数量
- indexCount:该indexFile中包含的索引个数
slots槽位
key的hash值%500w的结果即为slot槽位,然后将该slot值修改为该index索引单元的indexNo,根据这个indexNo可以计算出该index单元在indexFile中的位置。不过,该取模结果的重复率是很高的,为了解决这个问题,在每个index索引单元中添加了preIndexNo,用于指定该slot中当前index索引单元的前一个index索引单元。而slot中始终存放的是其下最新的index索引单元的indexNo,这样的话,只要找到了slot就可以找到最新的index索引单元,而通过这个index索引单元就可以找到其之前的所有index索引单元
indexNo是一个在indexFile中的流水号,从0开始递增
index索引单元
index索引单元默认20个字节,包含4个属性
- keyHash:消息中指定的业务key的hash值
- phyOffset:当前key对应的消息在commitlog中的偏移量commitlog offset
- timeDiff:当前key对应消息的存储时间与当前indexFile创建时间的时间差
- preIndexNo:当前slot下当前index索引单元的前一个索引单元的indexNo
indexNo在index索引单元中是没有体现的,其是通过indexes中依次数出来的
查询流程
定位公式:
- 计算指定消息key的slot槽位序号:key的hash%500w
- 计算槽位为n的slot在indexFile中的起始位置:40+(n-1)*4
- 计算indexNo为m的index在indexFile中的位置:40+500w*4+(m-1)*20
- 输入业务key与要查询的时间,开始查询
- 根据传入的时间找到相应的indexFile(文件名<=指定查询时间 的最大的文件)
- 计算出传入时间与找到的indexFile文件名间的差值diff
- 计算出业务key的hash值
- 计算出slot槽位序号(key的hash%500w)
- 根据slot槽位序号计算出该slot在indexFile中的位置(40+(n-1)*4)
- 找到slot后读取slot值,即当前slot中最新的index索引单元的indexNo
- 根据indexNo计算出该index单元在indexFile的位置(40+500w*4+(m-1)*20)
- 之前计算的时间差diff减去当前index单元中的timeDiff
- 如果>=0:读取该index单元的phyOffset,然后就可以定位到commitlog中的消息了
- 如果<0:读取该index单元的preIndexNo,作为要查找的下一个index索引单元的indexNo(越向前,index单元中的timeDiff越小)
消息的消费
消费者从broker中获取消息的方式有两种:pull拉取和push推送。消费者组对消息的消费模式又分为两种:集群消费clustering和广播消费broadcasting
获取消息类型
-
拉取模式:consumer主动从broker中拉取消息,主动权由consumer控制。这种方式实时性较弱
-
推送模式:broker收到数据后会主动推送给consumer。该模式实时性较高
该消费类型是典型的发布-订阅模式,即consumer向其关联的queue注册监听器,一旦有新消息就会触发回调,回调方式是consumer去queue中拉取消息。而这需要基于consumer和broker间的长连接。长连接是需要消耗系统资源的
对比:
- pull:需要应用去实现对关联queue的遍历,实时性差;但便于应用控制消息的拉取
- push:封装了对关联queue的遍历,实时性强,但会占用较多系统资源
组消费模式
- 广播模式:相同consumer group中的每个consumer实例都接收同一个topic的全量消息。即每条消息都会被发送到consumer group中的每个consumer
- 集群模式:相同consumer group中的每个consumer实例平均分摊同一个topic的消息。即每条消息只会被发送到consumer group中的某个consumer
消息进度保存
- 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会消费所有消息,但他们的消费进度是不同的。所以consumer各自保存各自的消费进度
- 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个topic中的消息,同一条消息只会被消费一次。消费进度参与到了消费的负载均衡中,故消费进度是需要共享的(config/consumerOffset.json中)
rebalance机制
rebalance即再均衡,是指将一个topic下的多个queue在同一个consumer group中的多个consumer间进行重新分配的过程
集群模式才会有rebalance机制
rebalance机制是为了提升消息的并行消费能力。例如:一个topic下5个队列,在只有一个消费者的情况下,这个消费者要负责这五个队列的消息。如果此时增加一个消费者,那么就可以给其中一个消费者分配两个,另一个分配三个,从而提升并行消费能力
限制
由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列数量时,多余的消费者将分配不到任何队列
危害
-
消费暂停:在触发rebalance时consumer要暂停部分队列的消费,要等rebalance结束时才能继续消费
-
消费重复:consumer在消费新分配给自己的队列时,必须接着之前consumer提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,可能会导致提交的offset和实际的offset不符,导致重复消费
-
同步提交:提交offset后,需要等待broker返回ack后才能获取下一批消息
-
异步提交:提交offset后,不需要等待broker的ack就可以直接获取下一批消息
-
-
消费突刺:由于rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致rebalance结束后瞬间需要消费很多消息
原因
导致rebalance产生的原因:
- 消费者所订阅的topic的queue数量发生变化
- 消费者组中消费者数量发生变化
过程
- 在broker中维护着多个map集合,这些集合中动态存放着当前topic中的queue信息、consumer group中的consumer实例信息。一旦发现消费者所订阅的queue数量发生变化,或消费者组中消费者的数量发生变化,立即向consumer group中的每个实例发出rebalance通知
- consumer实例接收到通知后会采用queue分配算法自己获取到相应的queue,即由consumer实例自主进行rebalance
在kafka中,一旦发现出现了rebalance条件,broker会调用group coordinator来完成rebalance。group coordinator是broker中的一个进程。
group coordinator会在consumer group中选出一个group leader。由这个leader根据自己本身情况完成partition分区的再分配。这个再分配结果会上报给coordinator,并由coordinator同步给group中的所有consumer实例
kafka中的的rebalance是由consumer leader完成的。而rockermq是由每个consumer自身完成的
queue分配算法
- 平均分配策略:该算法是要根据
avg = QueueCount / ConsumerCount
的结果进行分配的。如果能够整除,则按顺序将avg个queue逐个分配给Consumer;如果不能整除,则将多余的queue按照consumer顺序逐个分配 - 环形分配策略:根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配
- 一致性hash策略:将consumer的hash值作为node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针的方向,距离queue最近的那个consumer就是该queue要分配的consumer
- 同机房策略:根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形分配策略对所有queue进行分配
至少一次原则
rocketm有一个原则:每条消息必须要被成功消费一次
consumer在消费完消息后会向其消费进度记录器
提交其消费消息的offset,offset被成功记录到记录器中,那么这条消息就被成功消费了
消费进度记录器:
- 广播模式:consumer本身就是消费进度记录器
- 集群模式:broker是消费进度记录器
offset管理
消费进度offset是用来记录每个queue在不同消费者组的消费进度。根据消费进度记录器的不同,可以分为两种模式:本地模式和远程模式
本地模式
当消费者模式为广播模式
时,offset使用本地模式存储。因为每条消息都会被所有消费者消费,每个消费者管理自己的消费进度,各个消费者直接不存在消费进度的交集
consumer在广播模式下offset相关数据以json的形式持久化到consumer本地磁盘文件中,默认文件路径为当前用户目录下的.rocketmq_offsets/${clinetId}/${group}/Offsets.json
。其中${clientId}为当前消费者id,默认为ip@DEFAULT;${group}为消费者组名
远程模式
当消费模式为集群模式
时,offset使用远程模式管理。因为所有consumer实例对消息采用的是均衡消费,所有consumer共享queue的消费进度
consumer在集群模式下offset相关数据以json的形式持久化到broker磁盘中,文件路径为当前用户主目录下的store/config/consumerOffset.json
broker启动时会加载这个文件,并写入到一个双层map。外层map的key为topic@group,value为内层map。内层map的key为queueId,value为offset。当发生rebalance时,新的consumer会从该map中获取到相应的数据来继续消费
使用远程模式不光是因为共享消费进度,也是为了可以方便进行rebalance
用途
消费者要消费的第一条消息的起始位置是用户通过consumer.setConsumeFromWhere()方法指定的
在consumer启动后,要消费的第一条消息的起始位置常用的三种(ConsumeFromWhere枚举):
- CONSUME_FROM_LAST_OFFSET:从queue的的当前最后一条消息开始消费
- CONSUME_FROM_FLRST_OFFSET:从queue的第一条消息开始消费
- CONSUME_FROM_TIMESTAMP:从指定的具体时间戳位置的消息开始消费。时间戳是通过另外一个语句指定的cosumer.setConsumerTimestamp(“yyyyMMddHHmmss”)
当消费完一批消息后,consumer会提交其消费进度offset给broker,broker在收到消费进度后会将其更新到哪个双层map及consumerOffset.json文件中,然后向该consumer进行ack,而ack内容包含:当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次消费的起始offset(nextBeginOffset)
重试队列
当rocketmq对消息的消费出现异常时,会将发生异常的消息的offset提交到broker中的重试队列。系统在发生消息消费异常时会为当前topic@group创建一个重试队列,该队列以%RETRY%开头,到达重试时间后进行消费重试
同步提交和异步提交
集群模式下,consumer消费完消息后会向broker提交消费进度offset,其提交方式分为:
- 同步提交:消费者在消费完一批消息后会向broker提交这些消息的offset,然后等待broker的成功响应。若在等待超时之前收到了成功响应,则继续读取下一批消息进行消费(从ack中获取nextBeginOffset)。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待的过程中,消费者是阻塞的。其严重影响了消费者的吞吐量
- 异步提交:消费者在消费完一批消息后向broker提交offset,但无需等待broker的成功响应,可以继续读取并消费下一批消息。这种方式增加了消费者的吞吐量。但需要注意,broker在收到提交的offset后,还是会向消费者进行响应的。在没有收到ack时,consumer会直接从broker中获取nextBeginOffset
消费幂等
当出现消费者对某条消息重复消费时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是幂等的
幂等解决方案的设计中涉及到两项要素:幂等令牌,与唯一性处理。只要充分利用好这两项要素,就可以设计好的幂等解决方案
- 幂等令牌:是生产者和消费者两者间的既定协议,通常指具备唯一业务标识的字符串
- 唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行成功多次
解决方案
- 首先通过缓存去重。在缓存中如果已经存在了幂等令牌,则说明本次操作是重复性操作;若缓存没有命中,则进入下一步
- 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作;若不存在,则进入下一步
- 唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引写入到db
- 1中的缓存过期是有可能直接进入2中的,1可以帮2减轻压力
- rocketmq中不要用MessageID作为幂等令牌,因为会重复
- rocketmq本身可以保证消息不丢失,但不能保证消息不重复
消息堆积和消费延迟
消息处理流程中,如果consumer的消费速度跟不上producer的发送速度,mq中的未处理消息会越来越多,这部分消息就称为“堆积消息”。消息堆积进而会造成“消费延迟”
consumer使用长轮询pull模式消费消息时,分为两个阶段:
- 拉取消息:consumer通过长轮询pull模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。对于拉取式消费,在内网环境下会有很高的吞吐量,所以这一阶段不会成为消息堆积的瓶颈
- 消费消息:consumer将本地缓存的消息提交到消费线程中,使用业务消费逻辑对消费进行处理,处理完毕后获取到一个结果。这是真正的消息消费过程。此时consumer的消费能力就完全依赖于消息的消费耗时和消费并发度了。如果因为业务逻辑复杂,导致单条消息的耗时过长,则整体的吞吐量肯定不高,此时会导致consumer本地缓冲队列达到上限,停止从服务拉取消息
消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时和消费并发度决定(消费耗时的优先级高于消费并发度)
普通消息、延时消息、事务消息:消费并发度=单节点线程数*节点数量
顺序消息:
- 全局顺序消息:这类topic只有一个queue,consumer group中只有一个consumer能消费,并发度为1
- 分区顺序消息:这类topic有多个queue,只能保证单个queue的顺序消费。消息并发度=topic的queue分区数量
消息的清理
消息被消费过后不会被清理掉
消息是被顺序存储在commitlog文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以commitlog文件为单位进行清理的。否则会降低清理效率,并且实现逻辑复杂
commitlog文件存在一个过期时间,默认为72小时,即三天。除了用户手动清理外,在以下情况也会被自动清理(无论文件中的消息是否被消费过):
- 文件过期,且到达清理时间点(默认凌晨4点)后,自动清理过期文件
- 文件过期,且磁盘空间占用率已达到过期清理警戒线(默认75%)后,无论是否达到清理时间点,都会自动清理过期文件
- 磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的规则清理文件,无论是否过期。默认从最老的文件开始清理
- 磁盘占用率达到危险警戒线(默认90%)后,broker将拒绝消息写入
官方建议rocketmq服务的Linux文件系统采用ext4。因为对于文件删除操作,ext4比ext3性能更好