TOC
本文是RocketMQ源码分析系列的第六篇。在前面的章节中,我们分析了RocketMQ的整体架构、客户端设计、网络通信和Broker启动流程。现在我们来深入探讨RocketMQ最核心的部分——消息存储系统。RocketMQ的消息存储设计堪称经典,通过CommitLog + ConsumerQueue的组合,既保证了消息写入的高性能,又支持了灵活的消息消费模式。
存储设计思想
传统消息队列的存储问题
在分析RocketMQ的存储设计之前,我们先来看看比较传统的消息队列存储架构。以kafka为例,通常每个Topic对应一个或多个Partition,每个Partition又对应多个Segment文件。这种设计虽然简单,但在高并发场景下会遇到以下问题:
- 随机写入性能差:如果为每个Topic的每个Partition都独立维护一个文件,在高并发场景下会产生大量随机IO,严重影响性能
- 文件数量过多:Topic和Partition数量增长时,文件数量会急剧增加,给文件系统带来压力
RocketMQ的化零为整设计
RocketMQ采用了"化零为整"的设计思想,将所有消息统一写入一个CommitLog文件,然后通过ConsumerQueue建立索引。每个ConsumeQueue对应一个Topic-Queue的逻辑概念,记录着该Topic-Queue上的消息在CommitLog中的存储位置。消费的时候,broker先根据Topic-QueueId-Offset定位到对应的ConsumerQueue文件,然后通过索引快速找到CommitLog中的消息位置。
这种设计有以下优势:
- 顺序写入:所有消息都追加写入CommitLog,充分利用磁盘顺序IO的高性能
- 统一存储:不管有多少Topic和Queue,都只需要一个(准确地说是一系列滚动的)CommitLog文件,减少文件数量
- 索引读取:ConsumerQueue提供了Topic-Queue的索引,消费者可以快速定位到CommitLog中的消息位置,避免全盘扫描;更进一步地,RocketMQ还提供了Index文件支持按Key查询消息。
对比实际的存储文件
假设我们在Kafka和RocketMQ上分别创建了两个Topic(Topic-a, Topic-b),各自包含3个Partition,看看他们生成的存储文件是怎样的:
大的设计有了,下面看看RocketMQ是如何实现这个设计的。
CommitLog详解
CommitLog的文件结构
CommitLog是RocketMQ存储消息的核心文件,具有以下特点:
文件组织形式
- CommitLog由多个固定大小的文件组成(默认1GB)
- 文件名为该文件第一条消息的物理偏移量(20位数字,不足左补0)
- 例如:00000000000000000000、00000000001073741824、00000000002147483648
消息存储格式
每条消息在CommitLog中的存储格式包含:
消息长度(4字节) + 魔数(4字节) + CRC校验(4字节) + Flag(4字节) +
消息体长度(4字节) + 消息体 + Topic长度(1字节) + Topic +
Queue信息 + 消息属性(properties)长度 + 消息属性 + ...
CommitLog的写入流程
消息写入的核心方法
// 核心写入方法分析
DefaultMessageStore#asyncPutMessage()
├─ CommitLog#putMessage()
├─ 获取当前写入文件(MappedFile)
├─ 序列化消息内容
├─ 追加写入到MappedFile
└─ 触发刷盘策略
MappedFile与内存映射
- RocketMQ使用Java NIO的MappedByteBuffer技术
- 将文件映射到内存,提升读写性能
- 支持同步/异步刷盘策略
刷盘策略
- 同步刷盘:消息写入后立即调用fsync,确保数据持久化
- 异步刷盘:定时批量刷盘,提升性能但可能丢失部分数据
ConsumerQueue详解
ConsumerQueue的设计目的
虽然CommitLog解决了写入性能问题,但带来了新的挑战:
- 消费者需要按Topic-Queue维度拉取消息,如果直接扫描CommitLog,效率很低
ConsumerQueue就是为了解决这个问题而设计的轻量级索引文件。
ConsumerQueue的文件结构
目录组织
ConsumerQueue文件按Topic和QueueId组织目录结构:
store/consumequeue/{topic}/{queueId}/{fileName}
例如:store/consumequeue/TestTopic/0/00000000000000000000
索引条目格式
每个索引条目固定20字节(o.a.r.store.ConsumeQueue:35):
// o.a.r.store.ConsumeQueue:35
public static final int CQ_STORE_UNIT_SIZE = 20;
具体结构如下:
| CommitLog物理偏移量(8字节) | 消息大小(4字节) | Tag哈希值(8字节) |
- CommitLog物理偏移量:消息在CommitLog文件中的起始位置
- 消息大小:消息体的字节长度
- Tag哈希值:消息Tag的哈希值,用于快速过滤
文件文件默认特点
- 每个包含30万个条目(
mappedFileSizeConsumeQueue默认约6MB) - 文件名为该文件第一个条目的逻辑偏移量(消息在队列中的序号)
- 支持扩展字段(ConsumeQueueExt),用于存储更丰富的过滤信息
核心代码结构
ConsumeQueue类(o.a.r.store.ConsumeQueue:32)的核心字段:
// o.a.r.store.ConsumeQueue:32
public class ConsumeQueue {
private final DefaultMessageStore defaultMessageStore;
private final MappedFileQueue mappedFileQueue; // 文件管理
private final String topic;
private final int queueId;
private final ByteBuffer byteBufferIndex; // 索引缓冲区
private final String storePath; // 存储路径
private final int mappedFileSize; // 文件大小
private long maxPhysicOffset = -1; // 最大物理偏移量
private volatile long minLogicOffset = 0; // 最小逻辑偏移量
private ConsumeQueueExt consumeQueueExt = null; // 扩展字段
}
getIndexBuffer方法
根据逻辑Offset获取对应的索引缓冲区(o.a.r.store.ConsumeQueue#getIndexBuffer:544):
// o.a.r.store.ConsumeQueue#getIndexBuffer:544
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
// 将逻辑Offset转换为物理字节偏移量
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
// 找到对应的MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
// 返回该位置的缓冲区
return mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
}
}
return null;
}
rollNextFile方法
计算切换到下一个文件的逻辑Offset(o.a.r.store.ConsumeQueue#rollNextFile:578):
// o.a.r.store.ConsumeQueue#rollNextFile:578
public long rollNextFile(final long index) {
int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
// 返回下一个文件起始的逻辑Offset
return index + totalUnitsInFile - index % totalUnitsInFile;
}
ConsumerQueue的构建流程
ReputMessageService异步构建
ConsumerQueue的构建是由ReputMessageService(o.a.r.store.DefaultMessageStore.ReputMessageService:1972)异步完成的。该服务会实时扫描CommitLog,将新到达的消息索引分发到对应的ConsumerQueue中。
核心流程(o.a.r.store.DefaultMessageStore.ReputMessageService#doReput:2009)如下:
ReputMessageService.doReput()
│
├─ 从CommitLog读取新消息 [2022]
│ └─ commitLog.getData(reputFromOffset)
│
├─ 解析消息元数据 [2028-2029]
│ └─ commitLog.checkMessageAndReturnSize()
│
├─ 触发分发 [2034]
│ └─ doDispatch(dispatchRequest)
│
├─ 通知消息到达(长轮询) [2036-2044]
│ └─ messageArrivingListener.arriving()
│
└─ 更新reputFromOffset [2046]
构建时机
ConsumerQueue的构建发生在两个时机:
- Broker启动时:扫描CommitLog文件,重建历史消息的索引
- 运行时:通过ReputMessageService实时构建新消息的索引
doDispatch方法
doDispatch方法(o.a.r.store.DefaultMessageStore#doDispatch:1519)负责将消息索引写入对应的ConsumerQueue:
// o.a.r.store.DefaultMessageStore#doDispatch:1519
public void doDispatch(DispatchRequest req) {
// 写入ConsumerQueue索引
final int queueId = req.getQueueId();
if (req.getTopic().equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
// 延迟消息特殊处理
} else {
// 正常消息,写入ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(req.getTopic(), queueId);
if (consumeQueue != null) {
consumeQueue.putMessagePositionInfo(
req.getCommitLogOffset(),
req.getMsgSize(),
req.getConsumeQueueOffset(),
req.getTagsCode());
}
}
// 写入IndexFile(如果启用了)
if (this.indexService != null) {
// ...
}
}
putMessagePositionInfo方法(o.a.r.store.ConsumeQueue#putMessagePositionInfo:478)负责将索引信息写入MappedFile:
// o.a.r.store.ConsumeQueue#putMessagePositionInfo:478
private boolean putMessagePositionInfo(final long offset, final int size,
final long tagsCode, final long cqOffset) {
// 写入索引数据到ByteBuffer
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset); // CommitLog偏移量
this.byteBufferIndex.putInt(size); // 消息大小
this.byteBufferIndex.putLong(tagsCode); // Tag哈希值
// 追加到MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
// ...
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
这个过程是异步的,Broker会批量将索引写入磁盘,保证消息写入的高性能。
消息读取流程
消费者拉取消息的完整流程
- 定位ConsumerQueue:根据Topic、QueueId、ConsumeOffset确定读取位置
- 读取索引条目:从ConsumerQueue获取CommitLog偏移量和消息大小
- 读取消息内容:根据偏移量从CommitLog读取完整消息
- 消息过滤:根据Tag等条件过滤消息
- 返回消息:将符合条件的消息返回给消费者
性能优化技巧
预读机制
- 消费者可以指定批量拉取消息数量
- ConsumerQueue支持批量读取多个索引条目
缓存机制
- 热点ConsumerQueue会被缓存在内存中
- 使用LRU策略管理缓存
Index文件
Index文件的作用
除了ConsumerQueue,RocketMQ还提供了Index文件用于支持按Key查询消息:
使用场景
- 根据Message Key查找特定消息
- 支持时间范围查询
- 用于消息轨迹追踪
文件结构
Index文件采用哈希索引结构:
- Header:存储文件元信息
- Hash槽:哈希表的桶
- Index条目:具体的索引记录
存储相关配置优化
重要配置参数
CommitLog相关
commitLogFileSize:单个CommitLog文件大小(默认1GB)flushCommitLogTimed:是否定时刷盘flushIntervalCommitLog:刷盘间隔
ConsumerQueue相关
mapedFileSizeConsumeQueue:单个ConsumerQueue文件大小flushIntervalConsumeQueue:ConsumerQueue刷盘间隔
性能调优建议
- 根据消息大小调整CommitLog文件大小
- 在可靠性和性能之间选择合适的刷盘策略
- 合理配置预分配文件数量
通过本章的分析,我们深入了解了RocketMQ消息存储的精妙设计。CommitLog + ConsumerQueue的组合既保证了写入性能,又支持了灵活的消息消费。这种设计思想值得我们在其他存储系统设计中借鉴。
下一章我们将分析消息读取的具体实现,包括正常流程和各种异常情况的处理机制。