RocketMQ Broker 深入学习

学习一下火箭消息 - Broker 的原理和使用🚀

参数说明

参数 说明
brokerClusterName=rocketmq-cluster-1 所属集群名字
brokerName=broker-a broker名字,注意此处不同的配置文件填写的不一样
brokerId=0 0 表示Master, > 0 表示slave
namesrvAddr=127.0.0.1:9876;127.0.0.2:9876 nameServer 地址,分号分割
defaultTopicQueueNums=4 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
autoCreateTopicEnable=false 是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
useEpollNativeSelector=true
listenPort=10923 Broker 对外服务的监听端口
haListenPort=10924
deleteWhen=04 删除时间,默认是凌晨四点
fileReservedTime=120 文件保留时间,默认48小时,单位是 hour
mapedFileSizeCommitLog=1073741824 commitLog每个文件的大小默认1G
mapedFileSizeConsumeQueue=300000 ConsumeQueue每个文件默认存30W条,根据业务情况调整
#redeleteHangedFileInterval=120000 destroyMapedFileIntervalForcibly=120000
diskMaxUsedSpaceRatio=88 检测物理文件磁盘空间
storePathRootDir=/data/store 存储路径
storePathCommitLog=/data/store/commitlog commitLog存储路径
storePathConsumeQueue=/data/store/consumequeue 消费队列存储路径
storePathIndex=/data/store/index 消息索引存储路径
storeCheckpoint=/data/store/checkpoint checkpoint 文件存储路径
abortFile=/data/store/abort abort 文件存储路径
maxMessageSize=5242880 限制的消息大小,默认 1M
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
brokerRole=SYNC_MASTER Broker 的角色 - ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE
flushDiskType=ASYNC_MASTER 刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘
#checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128 发消息线程池数量
#pullMessageTreadPoolNums=128 拉消息线程池数量
useReentrantLockWhenPutMessage=false
waitTimeMillsInSendQueue=2500 刷盘等待时间,超时将会返回发送失败码给发送者
transferMsgByHeap=false
slaveReadEnable=true 是否允许 slave 读

上面罗列的是常见的参数,例如 Broker 集群的名称,主从角色,消息存储路径、文件大小、清理时间等等

之前也遇到默认参数不适合使用的常见,例如遇到业务方瞬间发送大量消息, Broker 同步刷盘时间超过默认的 2.5s,导致其它业务方遇到发送消息失败的场景,于是在 Broker 能力完全充足的情景下,调整了 waitTimeMillsInSendQueue 到 5s,避免影响其它业务方使用

文件存储机制

重要说明:

想要深入了解 RocketMQ 消息存储的内幕,需要了解这两方面

  • 文件存储的数据结构
  • 灵活利用 Linux 的文件机制 mmap

这次学习记录,参考了 STAR 皆空 大神,这里记录的是『消息存储的数据结构』,关于 mmap 的内容,可以点击参考链接深入学习。


RocketMQ 有很多亮点,其中一个是选择直接使用操作系统来提升存储效率,写入二进制格式的文件,消息持久化过程最大化的转成顺序写,避免随机写的额外开销。

这里记录一下跟 Broker 消息存储相关的内容

  • CommitLog(消息内容)
  • ConsumeQueue(位点数据)
  • Index(检索索引)
  • Broker 接收到【发送消息】操作
  • Broker 接收到【获取消息】操作

Broker 端整体架构

Broker 作为消息中转器,提供了消息发送、存储、查询,还有高可用的功能。

其中有几个重要模块:

  • Remoing Module: 请求的入口,使用了 Netty 作为远程通讯工具,处理发送过来的请求。
  • Client Manager: 管理客户端(发送者、消费者)并维护消费者的主题订阅。
  • Store Service: 提供 API 来存储或查询物理磁盘上的消息。
  • HA Service: 高可用服务,为主从节点之间提供数据同步功能。
  • Index Service: 索引服务,可以根据特定 Key,建立消息索引,并提供快速消息查询功能。

消息物理存储结构

从前面表格中的参数 storePathXXXX 可以知道,文件存储相关位置在 /data/store,使用 tree 命令查看这些消息文件的存储结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$tree commitlog/ consumequeue/ index/
commitlog/
├── 00000000051539607552
├── 00000000052613349376
├── 00000000053687091200
├── 00000000054760833024
├── 00000000055834574848
├── 00000000056908316672
...
consumequeue/
├── prod%test-events
│   ├── 0
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 1
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 10
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 11
│   │   ├── 00000000000000000000
...
│   ├── 2
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 3
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 4
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
...
index/
├── 20200930012239943
├── 20201012111555094
└── 20201017045900966

CommitLog 消息数据文件

借鉴于 Kafka,RocketMQ 也是以 Topic 作为文件存储的基本单元,每个 Topic 都有其对应的数据文件和索引文件。

RocketMQ 与 Kafka 不同点在于,Kafka 将消息数据文件按 Topic 分开存储,如果存在大量 Topic 情况下,消息持久化会逐渐变成随机磁盘读写,消息中间件的高性能被磁盘IO 所限制;而 RocketMQ 对其进行改进,将全部 Topic 的数据文件写入同一个文件(commitLog)中,实现消息的顺序写。

单个 CommitLog 的大小为 1GB,每条消息及其元信息被顺序追加至文件,文件的尾部可能存在空闲区域。

除了记录消息本身的属性(消息长度、消息体、Topic 长度、Topic、消息属性长度、消息属性),CommitLog 同时记录了消息所在 ConsumeQueue 消费队列的信息(消费队列 ID 和偏移量)。

由于存储条目具备不定长的特性,当 CommitLog 剩余空间无法满足下一条消息的存储,会在当前 CommitLog 的尾部追加一个 MAGIC CODE 等于 BLANK_MAGIC_CODE 的存储条目作为结束标记,并开始下一个 CommitLog 文件的操作。

ConsumeQueue 消息队列文件

Topic 是个抽象概念,消息实际发往的是 consumeQueue 这个逻辑队列中,在 consumeQueue 中,记录了消息在 CommitLog 中的位置信息

单个 ConsumeQueue 文件大小为 6000000 Byte,存储 30W 条记录,每条记录固定 20B

与 CommitLog 不同,ConsumeQueue 的存储条目采用定长存储结构。

为了实现定长存储,ConsumeQueue 存储到了消息 Tag 的 HashCode,在 Broker 端进行消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode 和存储条目中的 Tag 的 HashCode 是否一致来决定是否消费消息。

Index 索引文件

在已有的 CommitLog 和 ConsumeQueue 基础上,已经满足一个消息中间件的消息发送和消费功能,RMQ 提供 Index 目的是为了检索消息更快,方便排查问题。

这个目录下的文件,提供了跟数据库索引一样的作用,「给定 Topic 和消息 Key,通过索引文件能快速找到消息」,提供了消息检索的作用,监控端的【消息查询】界面使用了这个功能。

单个 Index 文件大小等于 420000040 B,包含索引头(IndexHeader)、哈希槽(HashSlot)和消息索引(MessageIndex)

Index 存储条目的结构有点像 HashMap,使用链式地址法解决哈希冲突:
「每个 Hash Slot 关联一个 Message Index 链表,多个 Message Index 通过 preIndexOffset 连接。」

Broker 代码实现初探

Broker 启动阶段:

Broker Startup

org.apache.rocketmq.broker.BrokerStartup#main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#org.apache.rocketmq.broker.BrokerStartup#main
public class BrokerStartup {
public static void main(String[] args) {
start(createBrokerController(args));
}
...
}
#org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
...
}

Broker 将消息处理器注册到核心控制器 BrokerControllerBroker 定义了很多种消息处理器,查看 AsyncNettyRequestProcessor 继承图:

  • AdminBrokerProcessor
  • ClientManageProcessor
  • ConsumerManageProcessor
  • EndTransactionProcessor
  • ForwardRequestProcessor
  • PullMessageProcessor
  • QueryMessageProcessor
  • ReplyMessageProcessor
  • SendMessageProcessor

其中 SendMessageProcessor 负责处理【Producer 发送消息】的请求,PullMessageProcessor 负责处理【Consumer 消费消息】的请求。

SendMessageProcessor 处理器实现了 NettyRequestProcessor 接口,处理请求的 processRequest 方法,然后在 BrokerController 启动时,往 RemotingServer 中按照 RequestCode 来注册处理器。


Broker 接收到发送消息的请求

如果请求中的标志是 RequestCode.SEND_MESSAGE,那么就会交给 SendMessageProcessor 进行处理,同时底层大量使用线程池技术。

例如 Debug Broker 端代码:

可以看到,处理的线程名字前缀是 SendMessageThread_,异步回调处理请求。


Broker 接收到消费消息的请求

如果请求中的标志是 RequestCode.PULL_MESSAGE,那么就会交给 PullMessageProcessor 来进行处理,代码细节可以后续跟进看看。

测试发送消息的接口为:org.springframework.messaging.core.AbstractMessageSendingTemplate#convertAndSend(D, java.lang.Object)

核心方法:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand, org.apache.rocketmq.remoting.netty.RemotingResponseCallback)

  • 反序列化请求头 requestHeader
  • 从消息上下文恢复 mqTrace 链路
  • 判断是否批量发送
  • 进入单条发送逻辑
  • 预发送 preSend 校验
  • 完善消息详情,用户参数等
  • 消息存储 messageStore

进入消息存储:org.apache.rocketmq.store.MessageStore#asyncPutMessage

  • 检查存储状态 checkStoreStatus,主要是确认服务状态,主从节点角色,是否可写入,还有页面缓存是否繁忙 pageCacheBusy
  • 消息常规性校验,topic 长度,参数合法性
  • 写入 commitLog 的方法 this.commitLog.asyncPutMessage(msg)

简单小结:

  • 客户端在调用 API 发送消息时,构造 RemotingCommand,头部信息设置 RequestCode.SEND_MESSAGE_V2,还有相关属性以及消息体。
  • 消息中转器 Broker,启动着 NettyRemotingServer,接收到请求
  • 识别请求头的 RequestCode.SEND_MESSAGE_V2 状态码,将请求交给对应的处理器 SendMessageProcessor
  • Request 中恢复消息,校验消息合法性
  • 消息存储,将消息详情写入到 commitLog 中
  • 返回处理结果 putMessageResult

网络模型

Broker 的良好性能,有一半得归功于 Netty 这个优秀的通讯框架,扒了一下 Broker 上代码实现还有网上资料,记录一下使用到的 Netty 网络模型。

Netty 网络模型

各模块作用:

  • eventLoopGroupBoss

作为 acceptor 负责接收客户端的连接请求

  • eventLoopGroupSelector

负责 NIO 的读写操作

  • NettyServerHandler

读取 IO 数据,并对消息头进行解析

  • disatch

过程根据注册的消息 code 和 processsor 把不同的事件分发给不同的线程。
由 processTable 维护(类型为 HashMap)

线程池 & 请求码

Broker 中大量使用线程池技术,通过状态码对请求进行分类,将请求分发到不同的线程池,以此达到资源隔离的目的,每个线程池接收到请求,经过解码 decode 请求体和组装上下文 ctx,接着交给相应的处理器 xxxProcessor 差异化处理网络请求。

RequestCode 位置在:

org.apache.rocketmq.common.protocol.RequestCode


broker 启动时,注册处理线程池的位置在:

org.apache.rocketmq.broker.BrokerController#registerProcessor

对照表

作用 线程池 处理器 请求码
发送消息 sendMessageExecutor SendMessageProcessor RequestCode.SEND_MESSAGE RequestCode.SEND_MESSAGE_V2 RequestCode.SEND_BATCH_MESSAGE RequestCode.CONSUMER_SEND_MSG_BACK
拉取消息 pullMessageExecutor PullMessageProcessor RequestCode.PULL_MESSAGE
消息重试 replyMessageExecutor replyMessageProcessor RequestCode.SEND_REPLY_MESSAGE RequestCode.SEND_REPLY_MESSAGE_V2
查询消息 queryMessageExecutor NettyRequestProcessor RequestCode.QUERY_MESSAGE RequestCode.VIEW_MESSAGE_BY_ID
客户端注册、心跳检测 heartbeatExecutor、clientManageExecutor ClientManageProcessor RequestCode.HEART_BEAT RequestCode.UNREGISTER_CLIENT, RequestCode.CHECK_CLIENT_CONFIG
消费端处理(例如位点更新) consumerManageExecutor ConsumerManageProcessor RequestCode.GET_CONSUMER_LIST_BY_GROUPRequestCode.UPDATE_CONSUMER_OFFSET RequestCode.QUERY_CONSUMER_OFFSET**
事务消息处理 endTransactionExecutor EndTransactionProcessor RequestCode.END_TRANSACTION
处理集群信息 adminBrokerExecutor AdminBrokerProcessor Default,不属于前面的请求码都由它进行处理

小结

本次学习主要从以下三个方面去了解:

  • Broker 启动参数说明
  • 消息文件的存储机制
  • 使用的 Netty 网络模型

从简单的使用到底层原理学习,逐渐剖开 MQ 框架的深层,慢慢将文件操作使用到的技术也文件存储机制了解,接着再去学习 Netty 网络模型,将网络通讯的基础也开始补起来,从一个点发散,将知识体系补全,还有很多需要去学习和了解的,后面继续补充吧~

参考资料

RocketMQ高性能揭秘